Skip to content

Commit ee7779c

Browse files
GH-3338: Fix ConcurrentContainer lifecycle issues
Fixes: #3338 * Fix ConcurrentContainer lifecycle issues This commit would fix the following issues. 1) 'isChildRunning' API would return true only after all the containers are actually stopped. 2) ConcurrentContainer `start` would be permitted only after all the containers running status is false. 3) ConcurrentContainer `stop` would be permitted if the container is in running status or if previously `stop` API is not called. 4) Move the logic to verify whether to permit the `stop` call to KafkaMessageListenerContainer and ConcurrentMessageListenerContainer. 5) Add 'stopAbnormally' in a Lock. 6) Set the ConcurrentContainer running status to true after `childStarted` 7) Set the ConcurrentContainer running status to false after `childStopped` 8) Call `childStarted` in ConcurrentContainer from KafkaMessageListenerContainer right before publishing ConsumerStartedEvent. * Fix ContainerGroupSequencer stopParentAndCheckGroup method This commit would fix the issue when exactly the ConcurrentContainer has to be stopped. As per the earlier logic, running status would not be set to false if any of the container is stopped. This is not correct and modified the logic to set running status to false even if one of the container is stopped. So, it is sufficient to call directly stop API on parent container that would internally check if all the containers are stopped and would execute the callback accordingly. As per the review comments, this commit reverts the changes related to the ConcurrentContainer `running` status. Summary of all changes in this PR. 1) 'isChildRunning' API would return true only after all the containers are actually stopped. 2) Add 'stopAbnormally' in a Lock. 3) Call `childStarted` in ConcurrentContainer from KafkaMessageListenerContainer right before publishing ConsumerStartedEvent. * Revert changes in ContainerGroupSequencer * Clear previous childContainers during Start call This commit would change the time at which the childContainers are cleared. Earlier, childContainers are cleared during `stop` call. But, after this change childContainers would be cleared only during the next `start` call. * Clear ChildContainers after all containers stopped. This commit would include the following changes. 1) Clear all the containers after all the child containers stopped. Previous commit clears only during the fresh start. 2) Publish `ConcurrentContainerStoppedEvent` when the ConcurrentContainer and all the child child containers are stopped. But, previously `ConcurrentContainerStoppedEvent` is emitted when all the containers are stopped. **Auto-cherry-pick to `3.2.x` & `3.1.x`**
1 parent f11c7ca commit ee7779c

File tree

4 files changed

+344
-27
lines changed

4 files changed

+344
-27
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
* @author Wang Zhiyang
7272
* @author Soby Chacko
7373
* @author Sanghyeok An
74+
* @author Lokesh Alamuri
7475
*/
7576
public abstract class AbstractMessageListenerContainer<K, V>
7677
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
@@ -660,8 +661,14 @@ public void stop(Runnable callback) {
660661

661662
@Override
662663
public void stopAbnormally(Runnable callback) {
663-
doStop(callback, false);
664-
publishContainerStoppedEvent();
664+
this.lifecycleLock.lock();
665+
try {
666+
doStop(callback, false);
667+
publishContainerStoppedEvent();
668+
}
669+
finally {
670+
this.lifecycleLock.unlock();
671+
}
665672
}
666673

667674
protected void doStop(Runnable callback) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 58 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -204,14 +204,20 @@ public boolean isContainerPaused() {
204204

205205
@Override
206206
public boolean isChildRunning() {
207-
if (!isRunning()) {
208-
return false;
209-
}
210-
for (MessageListenerContainer container : this.containers) {
211-
if (container.isRunning()) {
207+
this.lifecycleLock.lock();
208+
try {
209+
for (MessageListenerContainer container : this.containers) {
210+
if (container.isRunning()) {
211+
return true;
212+
}
213+
}
214+
if (this.startedContainers.get() > 0) {
212215
return true;
213216
}
214217
}
218+
finally {
219+
this.lifecycleLock.unlock();
220+
}
215221
return false;
216222
}
217223

@@ -245,7 +251,7 @@ protected void doStart() {
245251
+ topicPartitions.length);
246252
this.concurrency = topicPartitions.length;
247253
}
248-
this.startedContainers.set(0);
254+
clearState();
249255
setRunning(true);
250256

251257
for (int i = 0; i < this.concurrency; i++) {
@@ -373,37 +379,59 @@ protected void doStop(final Runnable callback, boolean normal) {
373379
}
374380
}
375381
}
376-
this.containers.clear();
377382
setStoppedNormally(normal);
383+
// All the containers are stopped before calling stop API
384+
if (this.startedContainers.get() == 0) {
385+
publishConcurrentContainerStoppedEvent(this.reason);
386+
}
378387
}
379388
}
380389

381390
@Override
382391
public void childStarted(MessageListenerContainer child) {
383-
this.startedContainers.incrementAndGet();
392+
this.lifecycleLock.lock();
393+
try {
394+
if (this.containers.contains(child)) {
395+
this.startedContainers.incrementAndGet();
396+
}
397+
}
398+
finally {
399+
this.lifecycleLock.unlock();
400+
}
384401
}
385402

386403
@Override
387404
public void childStopped(MessageListenerContainer child, Reason reason) {
388-
if (this.reason == null || reason.equals(Reason.AUTH)) {
389-
this.reason = reason;
390-
}
391-
int startedContainersCount = this.startedContainers.decrementAndGet();
392-
if (startedContainersCount == 0) {
393-
publishConcurrentContainerStoppedEvent(this.reason);
394-
boolean restartContainer = Reason.AUTH.equals(this.reason)
395-
&& getContainerProperties().isRestartAfterAuthExceptions();
396-
this.reason = null;
397-
398-
if (restartContainer) {
399-
// This has to run on another thread to avoid a deadlock on lifecycleMonitor
400-
AsyncTaskExecutor exec = getContainerProperties().getListenerTaskExecutor();
401-
if (exec == null) {
402-
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
405+
this.lifecycleLock.lock();
406+
try {
407+
if (!this.containers.contains(child)) {
408+
return;
409+
}
410+
if (this.reason == null || reason.equals(Reason.AUTH)) {
411+
this.reason = reason;
412+
}
413+
int startedContainersCount = this.startedContainers.decrementAndGet();
414+
if (startedContainersCount == 0) {
415+
if (!isRunning()) {
416+
this.containers.clear();
417+
publishConcurrentContainerStoppedEvent(this.reason);
418+
}
419+
boolean restartContainer = Reason.AUTH.equals(this.reason)
420+
&& getContainerProperties().isRestartAfterAuthExceptions();
421+
this.reason = null;
422+
if (restartContainer) {
423+
// This has to run on another thread to avoid a deadlock on lifecycleMonitor
424+
AsyncTaskExecutor exec = getContainerProperties().getListenerTaskExecutor();
425+
if (exec == null) {
426+
exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
427+
}
428+
exec.execute(this::start);
403429
}
404-
exec.execute(this::start);
405430
}
406431
}
432+
finally {
433+
this.lifecycleLock.unlock();
434+
}
407435
}
408436

409437
private void publishConcurrentContainerStoppedEvent(Reason reason) {
@@ -517,6 +545,12 @@ private boolean containsPartition(TopicPartition topicPartition, KafkaMessageLis
517545
return assignedPartitions != null && assignedPartitions.contains(topicPartition);
518546
}
519547

548+
private void clearState() {
549+
this.containers.clear();
550+
this.startedContainers.set(0);
551+
this.reason = null;
552+
}
553+
520554
@Override
521555
public String toString() {
522556
return "ConcurrentMessageListenerContainer [concurrency=" + this.concurrency + ", beanName="

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@
166166
* @author Christian Mergenthaler
167167
* @author Mikael Carlstedt
168168
* @author Borahm Lee
169+
* @author Lokesh Alamuri
169170
*/
170171
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
171172
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -378,7 +379,6 @@ protected void doStart() {
378379
}
379380
}
380381
this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry);
381-
this.thisOrParentContainer.childStarted(this);
382382
setRunning(true);
383383
this.startLatch = new CountDownLatch(1);
384384
this.listenerConsumerFuture = consumerExecutor.submitCompletable(this.listenerConsumer);
@@ -1369,6 +1369,7 @@ protected void initialize() {
13691369
this.count = 0;
13701370
this.last = System.currentTimeMillis();
13711371
initAssignedPartitions();
1372+
KafkaMessageListenerContainer.this.thisOrParentContainer.childStarted(KafkaMessageListenerContainer.this);
13721373
publishConsumerStartedEvent();
13731374
}
13741375

0 commit comments

Comments
 (0)