, String> topicForMessage() {
+ return message -> (String) message.getHeaders().get(PulsarHeaders.TOPIC_NAME);
+ }
+
+ @PulsarListener
+ public void listen(Message> message) {
+ super.listen(message);
+ }
+ }
+
+ static class PulsarTemplateTopicSender implements TestTopicSender {
+
+ private final PulsarTemplate pulsarTemplate;
+
+ PulsarTemplateTopicSender(PulsarTemplate pulsarTemplate) {
+ this.pulsarTemplate = pulsarTemplate;
+ }
+
+ @Override
+ public void send(String topic, P payload) {
+ doSend(topic, payload);
+ }
+
+ @Override
+ public void send(String topic, Message> message) {
+ doSend(topic, message);
+ }
+
+ private void doSend(String topic, Object payload) {
+ pulsarTemplate.send(topic, payload);
+ }
+ }
+
+}
diff --git a/applications/stream-applications-core/stream-applications-test-support/src/main/java/org/springframework/cloud/stream/app/test/integration/pulsar/PulsarStreamApps.java b/applications/stream-applications-core/stream-applications-test-support/src/main/java/org/springframework/cloud/stream/app/test/integration/pulsar/PulsarStreamApps.java
new file mode 100644
index 000000000..b89f35409
--- /dev/null
+++ b/applications/stream-applications-core/stream-applications-test-support/src/main/java/org/springframework/cloud/stream/app/test/integration/pulsar/PulsarStreamApps.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.app.test.integration.pulsar;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.testcontainers.containers.GenericContainer;
+
+import org.springframework.cloud.stream.app.test.integration.StreamApps;
+/**
+ * Configures an end to end Stream (source, processor(s), sink) using
+ * {@link PulsarStreamAppContainer}s.
+ * @author David Turanski
+ */
+public class PulsarStreamApps extends StreamApps {
+
+ protected PulsarStreamApps(GenericContainer sourceContainer, List processorContainers, GenericContainer sinkContainer) {
+ super(sourceContainer, processorContainers, sinkContainer);
+ }
+
+ public static Builder pulsarStreamApps(String streamName, GenericContainer messageBrokerContainer) {
+ return new PulsarBuilder(streamName, messageBrokerContainer);
+ }
+
+ public static final class PulsarBuilder extends Builder {
+
+ protected PulsarBuilder(String streamName, GenericContainer messageBrokerContainer) {
+ super(streamName, messageBrokerContainer);
+ }
+
+ protected Map binderProperties() {
+ return Collections.singletonMap("SPRING_CLOUD_STREAM_PULSAR_BINDER_BROKERS",
+ messageBrokerContainer.getNetworkAliases().get(0) + ":6065");
+ }
+
+ @Override
+ protected PulsarStreamApps doBuild(GenericContainer sourceContainer, List processorContainers, GenericContainer sinkContainer) {
+ return new PulsarStreamApps(sourceContainer, processorContainers, sinkContainer);
+ }
+ }
+}
diff --git a/run-ITs.sh b/run-ITs.sh
index b88d626b1..daf9e21a3 100755
--- a/run-ITs.sh
+++ b/run-ITs.sh
@@ -11,7 +11,7 @@ if [ "$STREAM_APPS_VERSION" == "" ]; then
STREAM_APPS_VERSION=$($SCDIR/mvn-get-version.sh)
fi
CONTAINERS="s3-source sftp-source http-request-processor log-sink jdbc-source time-source http-source tcp-sink mongodb-sink"
-BROKERS="rabbit kafka"
+BROKERS="rabbit kafka pulsar"
for container in $CONTAINERS; do
for broker in $BROKERS; do
echo "Pulling springcloudstream/${container}-${broker}:$STREAM_APPS_VERSION"
diff --git a/spring-cloud-dataflow-apps-plugin/spring-cloud-dataflow-apps-generator-plugin/src/test/java/org/springframework/cloud/dataflow/app/plugin/SpringCloudStreamAppGeneratorMojoTest.java b/spring-cloud-dataflow-apps-plugin/spring-cloud-dataflow-apps-generator-plugin/src/test/java/org/springframework/cloud/dataflow/app/plugin/SpringCloudStreamAppGeneratorMojoTest.java
index 451c4d773..777e88480 100644
--- a/spring-cloud-dataflow-apps-plugin/spring-cloud-dataflow-apps-generator-plugin/src/test/java/org/springframework/cloud/dataflow/app/plugin/SpringCloudStreamAppGeneratorMojoTest.java
+++ b/spring-cloud-dataflow-apps-plugin/spring-cloud-dataflow-apps-generator-plugin/src/test/java/org/springframework/cloud/dataflow/app/plugin/SpringCloudStreamAppGeneratorMojoTest.java
@@ -101,9 +101,16 @@ public void prepareForTest() throws NoSuchFieldException {
rabbitDep.setArtifactId("spring-cloud-stream-binder-rabbit");
rabbitBinder.getMaven().getDependencies().add(rabbitDep);
+ SpringCloudStreamAppGeneratorMojo.Binder pulsarBinder = new SpringCloudStreamAppGeneratorMojo.Binder();
+ Dependency pulsarDep = new Dependency();
+ pulsarDep.setGroupId("org.springframework.cloud");
+ pulsarDep.setArtifactId("spring-cloud-stream-binder-pulsar");
+ pulsarBinder.getMaven().getDependencies().add(pulsarDep);
+
Map binders = new HashMap<>();
binders.put("kafka", kafkaBinder);
binders.put("rabbit", rabbitBinder);
+ binders.put("pulsar", pulsarBinder);
setMojoProperty("binders", binders);
diff --git a/spring-cloud-dataflow-apps-plugin/spring-cloud-dataflow-apps-generator-plugin/src/test/resources/unit/http-source-apps/pom.xml b/spring-cloud-dataflow-apps-plugin/spring-cloud-dataflow-apps-generator-plugin/src/test/resources/unit/http-source-apps/pom.xml
index 28ad2437c..1efb21e2e 100644
--- a/spring-cloud-dataflow-apps-plugin/spring-cloud-dataflow-apps-generator-plugin/src/test/resources/unit/http-source-apps/pom.xml
+++ b/spring-cloud-dataflow-apps-plugin/spring-cloud-dataflow-apps-generator-plugin/src/test/resources/unit/http-source-apps/pom.xml
@@ -208,6 +208,26 @@