Skip to content

[fix][broker]excessive replication speed leads to error: Producer send queue is full #24189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: master
Choose a base branch
from

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Apr 21, 2025

Motivation

Background

  • Replication has a limitation that limits the maximum in-flight publishing message.
    • Users can configure the quota by replicationProducerQueueSize
    • Replication has a variable named pendingMessages that records how many messages are pending to be published
    • When there is a task that fetches schemas, the replicator will pause and mark fetchSchemaInProgress as true
    • When the replicator needs to rewind the cursor, the replicator will pause and mark waitForCursorRewinding as true
  • Replication allows at most one inflight cursor reading, which was limited by havePendingRead.
  • Reolication allows multi inflight publishing

Issue: The multiple mechanisms described above can not work well

time/thread read more entries A read more entries B
1 calculate permits: got 1000 calculate permits: got 1000
2 set havePendingRead -> true
3 start reading
4 read out 1000 msgs
5 publishing is messages, and decrease permits one by one
6 set havePendingRead -> true
7 the 1000 msgs are still in-progress
8 start reading
9 read out 1000 msgs
10 publishing is messages, and decrease permits one by one
11 There are 2000 msgs in publishing, which is more than expected, get error Producer send queue is full
12 rewind cursor and trigger more read more entries, leads the situation bader

pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://public/default/output-partition-0 | prod-->prod-repl] Error producing on re
mote broker
pulsar-broker org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator.replicateEntries(GeoPersistentReplicator.java:191) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:313) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:180) ~[io.streamnative-managed-ledger-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
pulsar-broker     at java.base/java.lang.Thread.run(Unknown Source) [?:?]
pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/output-partition-0-pulsar.repl.prod-repl] Rewind from 2111423:1 to 2111423
:0
pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://public/default/output-partition-0 | prod-->prod-repl] Error producing on re
mote broker
pulsar-broker org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator.replicateEntries(GeoPersistentReplicator.java:191) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:313) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:180) ~[io.streamnative-managed-ledger-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
pulsar-broker     at java.base/java.lang.Thread.run(Unknown Source) [?:?]

Modifications

  • merge multiple mechanisms into one and fix the issue

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 21, 2025
@poorbarcode poorbarcode changed the title [fix][broker]excessive replication speed leads to error: Producer sen… [fix][broker]excessive replication speed leads to error: Producer send queue is full Apr 21, 2025
@poorbarcode poorbarcode self-assigned this Apr 21, 2025
@poorbarcode poorbarcode added this to the 4.1.0 milestone Apr 21, 2025
@poorbarcode poorbarcode added release/3.0.12 release/3.3.7 release/4.0.5 type/bug The PR fixed a bug or issue reported a bug labels Apr 21, 2025
@lhotari
Copy link
Member

lhotari commented Apr 22, 2025

@poorbarcode is there any relationship to PIP-269: Add an epoch of cursor to discard outdated reading or any other previous reported issues ?

@poorbarcode
Copy link
Contributor Author

@lhotari

@poorbarcode is there any relationship to #20469 or any other previous reported issues ?

It does not relate to

@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@poorbarcode poorbarcode force-pushed the fix/replication_queue_full branch from 424c175 to 1d6d5be Compare May 14, 2025 13:30
@codecov-commenter
Copy link

codecov-commenter commented May 21, 2025

Codecov Report

Attention: Patch coverage is 82.19178% with 39 lines in your changes missing coverage. Please review.

Project coverage is 74.21%. Comparing base (bbc6224) to head (835ec54).
Report is 1128 commits behind head on master.

Files with missing lines Patch % Lines
...roker/service/persistent/PersistentReplicator.java 86.30% 11 Missing and 12 partials ⚠️
...ar/broker/service/persistent/ShadowReplicator.java 22.22% 6 Missing and 1 partial ⚠️
...ache/pulsar/broker/service/AbstractReplicator.java 66.66% 2 Missing and 1 partial ⚠️
.../java/org/apache/pulsar/client/impl/ClientCnx.java 25.00% 2 Missing and 1 partial ⚠️
...er/service/persistent/GeoPersistentReplicator.java 84.61% 2 Missing ⚠️
...sar/broker/service/persistent/PersistentTopic.java 80.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24189      +/-   ##
============================================
+ Coverage     73.57%   74.21%   +0.64%     
- Complexity    32624    32683      +59     
============================================
  Files          1877     1867      -10     
  Lines        139502   145368    +5866     
  Branches      15299    16629    +1330     
============================================
+ Hits         102638   107891    +5253     
- Misses        28908    28931      +23     
- Partials       7956     8546     +590     
Flag Coverage Δ
inttests 26.71% <32.42%> (+2.12%) ⬆️
systests 23.26% <1.36%> (-1.07%) ⬇️
unittests 73.72% <82.19%> (+0.87%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...a/org/apache/pulsar/broker/service/Replicator.java 0.00% <ø> (ø)
...service/nonpersistent/NonPersistentReplicator.java 67.00% <100.00%> (+3.63%) ⬆️
...roker/service/persistent/MessageDeduplication.java 76.83% <100.00%> (-4.08%) ⬇️
...pulsar/client/impl/GeoReplicationProducerImpl.java 49.13% <100.00%> (ø)
...va/org/apache/pulsar/client/impl/ProducerImpl.java 84.24% <100.00%> (+0.65%) ⬆️
...sar/broker/service/persistent/PersistentTopic.java 79.79% <80.00%> (+1.33%) ⬆️
...er/service/persistent/GeoPersistentReplicator.java 71.65% <84.61%> (-6.37%) ⬇️
...ache/pulsar/broker/service/AbstractReplicator.java 67.64% <66.66%> (-17.36%) ⬇️
.../java/org/apache/pulsar/client/impl/ClientCnx.java 69.27% <25.00%> (-2.50%) ⬇️
...ar/broker/service/persistent/ShadowReplicator.java 45.90% <22.22%> (-12.64%) ⬇️
... and 1 more

... and 1075 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari
Copy link
Member

lhotari commented May 30, 2025

test failure:

 Error:  Tests run: 8, Failures: 1, Errors: 0, Skipped: 7, Time elapsed: 65.679 s <<< FAILURE! - in org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest
  Error:  org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest.testCreatePartitionedTopicWithNsReplication  Time elapsed: 20.673 s  <<< FAILURE!
  java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.client.api.Message.getValue()" because the return value of "org.apache.pulsar.client.api.Consumer.receive(int, java.util.concurrent.TimeUnit)" is null
  	at org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest.testCreatePartitionedTopicWithNsReplication(DisabledCreateTopicToRemoteClusterForReplicationTest.java:111)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)

could be a flaky test, attempt to fix was #24141

@lhotari
Copy link
Member

lhotari commented May 30, 2025

there seems to be more flakiness in tests. I can see the first attempt failed in a replication test as well

  Error:  Tests run: 140, Failures: 1, Errors: 0, Skipped: 116, Time elapsed: 343.104 s <<< FAILURE! - in org.apache.pulsar.broker.service.ReplicatorTest
  Error:  org.apache.pulsar.broker.service.ReplicatorTest.testReplicationWillNotStuckByIncompleteSchemaFuture  Time elapsed: 16.339 s  <<< FAILURE!
  org.awaitility.core.ConditionTimeoutException: Assertion condition replication task finished expected [true] but found [false] within 10 seconds.
  	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
  	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
  	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
  	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
  	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
  	at org.apache.pulsar.broker.service.ReplicatorTest.waitReplicateFinish(ReplicatorTest.java:523)
  	at org.apache.pulsar.broker.service.ReplicatorTest.testReplicationWillNotStuckByIncompleteSchemaFuture(ReplicatorTest.java:516)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)
  Caused by: java.lang.AssertionError: replication task finished expected [true] but found [false]
  	at org.testng.Assert.fail(Assert.java:110)
  	at org.testng.Assert.failNotEquals(Assert.java:1577)
  	at org.testng.Assert.assertTrue(Assert.java:56)
  	at org.apache.pulsar.broker.service.ReplicatorTest.lambda$waitReplicateFinish$5(ReplicatorTest.java:526)
  	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
  	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
  	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
  	... 4 more

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please debug the flaky test failures in replication tests before we merge this change.

@lhotari
Copy link
Member

lhotari commented May 30, 2025

/pulsarbot rerun-failure-checks

@poorbarcode poorbarcode requested a review from lhotari June 4, 2025 01:50
@poorbarcode
Copy link
Contributor Author

@lhotari

Please debug the flaky test failures in replication tests before we merge this change.

Fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test release/3.0.13 release/3.3.8 release/4.0.6 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants