diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java index 04f35dcfce400..291401c82fd7e 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java @@ -57,6 +57,7 @@ public static void startCluster() throws IOException { STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + STREAMS_CONFIG.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MISSING_TOPIC_DETECTION_TIMEOUT_MS); } @AfterAll @@ -66,6 +67,7 @@ public static void closeCluster() { private static final String APP_ID = "join-incomplete-metadata-integration-test"; private static final Long COMMIT_INTERVAL = 100L; + private static final int MISSING_TOPIC_DETECTION_TIMEOUT_MS = 5000; static final Properties STREAMS_CONFIG = new Properties(); static final String INPUT_TOPIC_RIGHT = "inputTopicRight"; static final String NON_EXISTENT_INPUT_TOPIC_LEFT = "inputTopicLeft-not-exist"; @@ -93,7 +95,8 @@ public void cleanup() throws InterruptedException, IOException { @ParameterizedTest @ValueSource(booleans = {false, true}) public void testShouldAutoShutdownOnJoinWithIncompleteMetadata(final boolean useNewProtocol) throws InterruptedException { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); + final String appId = APP_ID + "-" + (useNewProtocol ? "new" : "old"); + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); if (useNewProtocol) {