Skip to content

Commit 424c175

Browse files
committed
[fix][broker]excessive replication speed leads to error: Producer send queue is full
1 parent c6be44c commit 424c175

File tree

11 files changed

+417
-95
lines changed

11 files changed

+417
-95
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,12 +398,15 @@ protected CompletableFuture<Void> doCloseProducerAsync(Producer<byte[]> producer
398398
});
399399
}
400400

401+
protected abstract void beforeTerminate();
402+
401403
public CompletableFuture<Void> terminate() {
402404
if (!tryChangeStatusToTerminating()) {
403405
log.info("[{}] Skip current termination since other thread is doing termination, state : {}", replicatorId,
404406
state);
405407
return CompletableFuture.completedFuture(null);
406408
}
409+
beforeTerminate();
407410
return doCloseProducerAsync(producer, () -> {
408411
STATE_UPDATER.set(this, State.Terminated);
409412
this.producer = null;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,4 +258,9 @@ public long getNumberOfEntriesInBacklog() {
258258
protected void disableReplicatorRead() {
259259
// No-op
260260
}
261+
262+
@Override
263+
protected void beforeTerminate() {
264+
// No-op
265+
}
261266
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ protected CompletableFuture<Void> prepareCreateProducer() {
8181
}
8282

8383
@Override
84-
protected boolean replicateEntries(List<Entry> entries) {
84+
protected boolean replicateEntries(List<Entry> entries, InFlightTask inFlightTask) {
8585
boolean atLeastOneMessageSentForReplication = false;
8686
boolean isEnableReplicatedSubscriptions =
8787
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
@@ -90,12 +90,13 @@ protected boolean replicateEntries(List<Entry> entries) {
9090
// This flag is set to true when we skip at least one local message,
9191
// in order to skip remaining local messages.
9292
boolean isLocalMessageSkippedOnce = false;
93-
boolean skipRemainingMessages = false;
93+
boolean skipRemainingMessages = inFlightTask.isSkipReadResultDueToCursorRewound();
9494
for (int i = 0; i < entries.size(); i++) {
9595
Entry entry = entries.get(i);
9696
// Skip the messages since the replicator need to fetch the schema info to replicate the schema to the
9797
// remote cluster. Rewind the cursor first and continue the message read after fetched the schema.
9898
if (skipRemainingMessages) {
99+
inFlightTask.incCompletedEntries();
99100
entry.release();
100101
continue;
101102
}
@@ -108,12 +109,14 @@ protected boolean replicateEntries(List<Entry> entries) {
108109
log.error("[{}] Failed to deserialize message at {} (buffer size: {}): {}", replicatorId,
109110
entry.getPosition(), length, t.getMessage(), t);
110111
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
112+
inFlightTask.incCompletedEntries();
111113
entry.release();
112114
continue;
113115
}
114116

115117
if (Markers.isTxnMarker(msg.getMessageBuilder())) {
116118
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
119+
inFlightTask.incCompletedEntries();
117120
entry.release();
118121
msg.recycle();
119122
continue;
@@ -123,6 +126,7 @@ protected boolean replicateEntries(List<Entry> entries) {
123126
msg.getMessageBuilder().getTxnidLeastBits());
124127
if (topic.isTxnAborted(tx, entry.getPosition())) {
125128
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
129+
inFlightTask.incCompletedEntries();
126130
entry.release();
127131
msg.recycle();
128132
continue;
@@ -136,6 +140,7 @@ protected boolean replicateEntries(List<Entry> entries) {
136140
if (msg.isReplicated()) {
137141
// Discard messages that were already replicated into this region
138142
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
143+
inFlightTask.incCompletedEntries();
139144
entry.release();
140145
msg.recycle();
141146
continue;
@@ -147,6 +152,7 @@ protected boolean replicateEntries(List<Entry> entries) {
147152
entry.getPosition(), msg.getReplicateTo());
148153
}
149154
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
155+
inFlightTask.incCompletedEntries();
150156
entry.release();
151157
msg.recycle();
152158
continue;
@@ -159,19 +165,21 @@ protected boolean replicateEntries(List<Entry> entries) {
159165
replicatorId, entry.getPosition(), msg.getReplicateTo());
160166
}
161167
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
168+
inFlightTask.incCompletedEntries();
162169
entry.release();
163170
msg.recycle();
164171
continue;
165172
}
166173

167174
if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
168175
// The producer is not ready yet after having stopped/restarted. Drop the message because it will
169-
// recovered when the producer is ready
176+
// recover when the producer is ready
170177
if (log.isDebugEnabled()) {
171178
log.debug("[{}] Dropping read message at {} because producer is not ready",
172179
replicatorId, entry.getPosition());
173180
}
174181
isLocalMessageSkippedOnce = true;
182+
inFlightTask.incCompletedEntries();
175183
entry.release();
176184
msg.recycle();
177185
continue;
@@ -184,24 +192,31 @@ protected boolean replicateEntries(List<Entry> entries) {
184192

185193
CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
186194
if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
195+
/**
196+
* Skip in flight reading tasks.
197+
* Explain the result of the race-condition between:
198+
* - {@link #readMoreEntries}
199+
* - {@link #beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding)}
200+
* Since {@link #acquirePermitsIfNotFetchingSchema} and
201+
* {@link #beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding)} acquire the
202+
* same lock, it is safe.
203+
*/
204+
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Fetching_Schema);
205+
inFlightTask.incCompletedEntries();
187206
entry.release();
188207
headersAndPayload.release();
189208
msg.recycle();
190209
// Mark the replicator is fetching the schema for now and rewind the cursor
191210
// and trigger the next read after complete the schema fetching.
192-
fetchSchemaInProgress = true;
193211
skipRemainingMessages = true;
194-
cursor.cancelPendingReadRequest();
195212
log.info("[{}] Pause the data replication due to new detected schema", replicatorId);
196213
schemaFuture.whenComplete((__, e) -> {
197214
if (e != null) {
198215
log.warn("[{}] Failed to get schema from local cluster, will try in the next loop",
199216
replicatorId, e);
200217
}
201218
log.info("[{}] Resume the data replication after the schema fetching done", replicatorId);
202-
cursor.rewind();
203-
fetchSchemaInProgress = false;
204-
readMoreEntries();
219+
doRewindCursor(true);
205220
});
206221
} else {
207222
msg.setSchemaInfoForReplicator(schemaFuture.get());
@@ -214,7 +229,6 @@ protected boolean replicateEntries(List<Entry> entries) {
214229
stats.incrementMsgOutCounter();
215230
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
216231
// Increment pending messages for messages produced locally
217-
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
218232
if (log.isDebugEnabled()) {
219233
log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId());
220234
}

0 commit comments

Comments
 (0)