Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static void startCluster() throws IOException {
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
STREAMS_CONFIG.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MISSING_TOPIC_DETECTION_TIMEOUT_MS);
}

@AfterAll
Expand All @@ -66,6 +67,7 @@ public static void closeCluster() {

private static final String APP_ID = "join-incomplete-metadata-integration-test";
private static final Long COMMIT_INTERVAL = 100L;
private static final int MISSING_TOPIC_DETECTION_TIMEOUT_MS = 5000;
static final Properties STREAMS_CONFIG = new Properties();
static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
static final String NON_EXISTENT_INPUT_TOPIC_LEFT = "inputTopicLeft-not-exist";
Expand Down Expand Up @@ -93,7 +95,8 @@ public void cleanup() throws InterruptedException, IOException {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testShouldAutoShutdownOnJoinWithIncompleteMetadata(final boolean useNewProtocol) throws InterruptedException {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
final String appId = APP_ID + "-" + (useNewProtocol ? "new" : "old");
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

if (useNewProtocol) {
Expand Down