Skip to content

Added pulsar binder support. #631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/stream-apps-release-files-spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@
{
"name": {"$match": "*.jar"}
},
{
"name": {"$match": "*stream-applications-descriptor-*.stream-apps-pulsar-maven"}
},
{
"name": {"$match": "*stream-applications-descriptor-*.stream-apps-pulsar-docker"}
},
{
"name": {"$match": "*stream-applications-descriptor-*.stream-apps-pulsar-harbor"}
},
{
"name": {"$match": "*stream-applications-descriptor-*.stream-apps-kafka-maven"}
},
Expand All @@ -38,6 +47,9 @@
{
"name": {"$match": "*stream-applications-descriptor-*.stream-apps-rabbit-harbor"}
},
{
"name": {"$match": "*stream-applications-descriptor-*.pulsar-apps-maven-repo-url.properties"}
},
{
"name": {"$match": "*stream-applications-descriptor-*.kafka-apps-maven-repo-url.properties"}
},
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -732,10 +732,14 @@ jobs:
jfrog rt upload "$SRC_ROOT/kafka-apps-maven.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-kafka-maven"
jfrog rt upload "$SRC_ROOT/kafka-apps-docker.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-kafka-docker"
jfrog rt upload "$SRC_ROOT/kafka-apps-harbor.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-kafka-harbor"
jfrog rt upload "$SRC_ROOT/pulsar-apps-maven.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-pulsar-maven"
jfrog rt upload "$SRC_ROOT/pulsar-apps-docker.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-pulsar-docker"
jfrog rt upload "$SRC_ROOT/pulsar-apps-harbor.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-pulsar-harbor"
jfrog rt upload "$SRC_ROOT/rabbit-apps-maven.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-rabbit-maven"
jfrog rt upload "$SRC_ROOT/rabbit-apps-docker.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-rabbit-docker"
jfrog rt upload "$SRC_ROOT/rabbit-apps-harbor.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-rabbit-harbor"
jfrog rt upload "$SRC_ROOT/kafka-apps-maven-repo-url.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.kafka-apps-maven-repo-url.properties"
jfrog rt upload "$SRC_ROOT/pulsar-apps-maven-repo-url.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.pulsar-apps-maven-repo-url.properties"
jfrog rt upload "$SRC_ROOT/rabbit-apps-maven-repo-url.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.rabbit-apps-maven-repo-url.properties"

- name: Set Stream Applications Docs Properties for ${{ needs.parameters.outputs.release_train_version }}
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/descriptor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,13 @@ jobs:
jfrog rt upload "$SRC_ROOT/kafka-apps-maven.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-kafka-maven"
jfrog rt upload "$SRC_ROOT/kafka-apps-docker.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-kafka-docker"
jfrog rt upload "$SRC_ROOT/kafka-apps-harbor.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-kafka-harbor"
jfrog rt upload "$SRC_ROOT/pulsar-apps-maven.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-pulsar-maven"
jfrog rt upload "$SRC_ROOT/pulsar-apps-docker.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-pulsar-docker"
jfrog rt upload "$SRC_ROOT/pulsar-apps-harbor.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-pulsar-harbor"
jfrog rt upload "$SRC_ROOT/rabbit-apps-maven.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-rabbit-maven"
jfrog rt upload "$SRC_ROOT/rabbit-apps-docker.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-rabbit-docker"
jfrog rt upload "$SRC_ROOT/rabbit-apps-harbor.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.stream-apps-rabbit-harbor"
jfrog rt upload "$SRC_ROOT/kafka-apps-maven-repo-url.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.kafka-apps-maven-repo-url.properties"
jfrog rt upload "$SRC_ROOT/pulsar-apps-maven-repo-url.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.pulsar-apps-maven-repo-url.properties"
jfrog rt upload "$SRC_ROOT/rabbit-apps-maven-repo-url.properties" "${TARGET_REPO}/stream-applications-descriptor-${RELEASE_TRAIN_VERSION}.rabbit-apps-maven-repo-url.properties"
popd > /dev/null
10 changes: 10 additions & 0 deletions applications/stream-applications-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@
</maven>
</application>
<binders>
<pulsar>
<maven>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-pulsar</artifactId>
</dependency>
</dependencies>
</maven>
</pulsar>
<kafka>
<maven>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
Expand All @@ -62,6 +66,11 @@
<artifactId>kafka</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.StreamAppContainerTestUtils;
import org.springframework.cloud.stream.app.test.integration.kafka.KafkaStreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.pulsar.PulsarStreamAppContainer;
import org.springframework.cloud.stream.app.test.integration.rabbitmq.RabbitMQStreamAppContainer;
import org.springframework.core.annotation.AnnotatedElementUtils;

Expand Down Expand Up @@ -59,6 +60,9 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext ext
String version = getVersion(annotation);

switch (annotation.binder()) {
case Pulsar:
baseContainer = new PulsarStreamAppContainer(
StreamAppContainerTestUtils.imageName(annotation.repository(), annotation.name(), version));
case Kafka:
baseContainer = new KafkaStreamAppContainer(
StreamAppContainerTestUtils.imageName(annotation.repository(), annotation.name(), version));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ public enum Binder {
* RabbitMQ Binder.
*/
RabbitMQ,


/**
* Pulsar Binder.
*/
Pulsar,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.app.test.integration.pulsar;

import java.time.Duration;

import org.testcontainers.containers.Network;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;

/**
* Initializes and starts a {@link PulsarContainer}.
*
* @author David Turanski
* @author Artem Bilan
* @author Corneil du Plessis
*/
public abstract class PulsarConfig {


/**
* The PulsarContainer.
*/
public final static PulsarContainer pulsar = new PulsarContainer(
DockerImageName.parse("apachepulsar/pulsar-all"))
.withExposedPorts(6065, 8080)
.withNetwork(Network.SHARED)
.withStartupTimeout(Duration.ofSeconds(120))
.withStartupAttempts(3);

static {
pulsar.start();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.app.test.integration.pulsar;

import org.springframework.cloud.stream.app.test.integration.StreamAppContainer;

/**
* An implementation of
* {@link StreamAppContainer} for
* pulsar. This provides the required broker connection properties.
* @author David Turanski
*/
public class PulsarStreamAppContainer extends StreamAppContainer {

/**
* @param imageName the image name.
*/
public PulsarStreamAppContainer(String imageName) {
super(imageName, PulsarConfig.pulsar);
}

@Override
protected StreamAppContainer withBinderProperties() {
this.withEnv("SPRING_CLOUD_STREAM_PULSAR_BINDER_BROKERS",
messageBrokerContainer.getNetworkAliases().get(0) + ":6065");
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.app.test.integration.pulsar;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener;
import org.springframework.cloud.stream.app.test.integration.MessageMatcher;
import org.springframework.cloud.stream.app.test.integration.OutputMatcher;
import org.springframework.cloud.stream.app.test.integration.TestTopicListener;
import org.springframework.cloud.stream.app.test.integration.TestTopicSender;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.support.PulsarHeaders;

import static org.springframework.cloud.stream.app.test.integration.AbstractTestTopicListener.STREAM_APPLICATIONS_TEST_TOPIC;

/**
* Spring configuration for testing {@link PulsarStreamAppContainer}s.
* @author David Turanski
*/
@Configuration
@EnablePulsar
public class PulsarStreamAppContainerTestConfiguration {

private static final String SUFFIX = UUID.randomUUID().toString().substring(0, 8);

private static final String STREAM_APPLICATION_TESTS_GROUP = "stream-application-tests_" + SUFFIX;

@Bean
public PulsarClient pulsarClient(@Value("${spring.pulsar.client.service-url}") String serviceUrl) throws Exception {
return PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
}

@Bean
public PulsarProducerFactory<Object> pulsarProducerFactory(PulsarClient pulsarClient) {
return new DefaultPulsarProducerFactory<>(pulsarClient);
}

@Bean
public PulsarTemplate<Object> pulsarTemplate(PulsarProducerFactory producerFactory) {
return new PulsarTemplate<>(producerFactory);
}

@Bean
public OutputMatcher outputMatcher(TestTopicListener testTopicListener) {
return new OutputMatcher(testTopicListener);
}

@Bean
public TestTopicSender testTopicSender(PulsarTemplate pulsarTemplate) {
return new PulsarTemplateTopicSender(pulsarTemplate);
}

@Bean("pulsarConsumerFactory")
public PulsarConsumerFactory<String> consumerFactory(PulsarClient pulsarClient) {
Map<String, Object> configs = new HashMap<>();
ConsumerBuilderCustomizer<String> consumerBuilderCustomizer = (builder) -> {
builder.topic(STREAM_APPLICATIONS_TEST_TOPIC);
builder.subscriptionName(STREAM_APPLICATION_TESTS_GROUP);
};
return new DefaultPulsarConsumerFactory<>(pulsarClient, List.of(consumerBuilderCustomizer));
}

@Bean
public ConcurrentPulsarListenerContainerFactory<String> pulsarListenerContainerFactory(
@Value("pulsarConsumerFactory") PulsarConsumerFactory consumerFactory) {
return new ConcurrentPulsarListenerContainerFactory<>(consumerFactory, new PulsarContainerProperties());
}

@Bean
public PulsarProducerFactory<String> producerFactory(PulsarClient pulsarClient) {
return new DefaultPulsarProducerFactory<>(pulsarClient, STREAM_APPLICATIONS_TEST_TOPIC);

}

@Bean
public PulsarAdmin admin(@Value("${spring.pulsar.admin.service-url}") String adminUrl) throws PulsarClientException {
PulsarAdminBuilderImpl adminBuilder = new PulsarAdminBuilderImpl();
adminBuilder.serviceHttpUrl(adminUrl);
return adminBuilder.build();
}

@Bean
public PulsarListenerEndpointRegistry pulsarListenerEndpointRegistry() {
return new PulsarListenerEndpointRegistry();
}

@Bean
public PulsarTestListener testListener(PulsarListenerEndpointRegistry endpointRegistry) {
return new PulsarTestListener(endpointRegistry);
}

@PulsarListener(autoStartup = "true", topicPattern = STREAM_APPLICATIONS_TEST_TOPIC)
static class PulsarTestListener extends AbstractTestTopicListener {

PulsarTestListener(PulsarListenerEndpointRegistry endpointRegistry) {
super();
}

@Override
public boolean addMessageMatcher(String topic, MessageMatcher messageMatcher) {
return super.addMessageMatcher(topic, messageMatcher);
}

@Override
protected Function<Message<?>, String> topicForMessage() {
return message -> (String) message.getHeaders().get(PulsarHeaders.TOPIC_NAME);
}

@PulsarListener
public void listen(Message<?> message) {
super.listen(message);
}
}

static class PulsarTemplateTopicSender implements TestTopicSender {

private final PulsarTemplate pulsarTemplate;

PulsarTemplateTopicSender(PulsarTemplate pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}

@Override
public <P> void send(String topic, P payload) {
doSend(topic, payload);
}

@Override
public void send(String topic, Message<?> message) {
doSend(topic, message);
}

private void doSend(String topic, Object payload) {
pulsarTemplate.send(topic, payload);
}
}

}
Loading
Loading