Skip to content

[fix][broker]excessive replication speed leads to error: Producer send queue is full #24189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,10 @@ protected CompletableFuture<Boolean> isLocalTopicActive() {
/**
* This method only be used by {@link PersistentTopic#checkGC} now.
*/
public CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) {
@Override
public CompletableFuture<Void> disconnect() {
long backlog = getNumberOfEntriesInBacklog();
if (failIfHasBacklog && backlog > 0) {
if (backlog > 0) {
CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog"));
if (log.isDebugEnabled()) {
Expand All @@ -317,9 +318,30 @@ public CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean clos
}
log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId,
getReplicatorReadPosition(), backlog);
return closeProducerAsync(closeTheStartingProducer);
return beforeDisconnect()
.thenCompose(__ -> closeProducerAsync(true))
.thenApply(__ -> {
afterDisconnected();
return null;
});
}

/**
* This method and {@link #afterDisconnected()} are used to solve the following race condition:
* - Thread 1: calling disconnect.
* passed the check: no backlog.
* - Thread 2: published a message, then the cursor.pendingRead completes.
* - Thread 1: continue to disconnect.
* - Thread 2: read entries from the cursor, and try to send messages, but the messages will be discarded because
* the producer is closed.
* Issue: the pending reading's read position is not correct.
*/
protected CompletableFuture<Void> beforeDisconnect() {
return CompletableFuture.completedFuture(null);
}

protected void afterDisconnected() {}

/**
* This method only be used by {@link PersistentTopic#checkGC} now.
*/
Expand Down Expand Up @@ -398,12 +420,15 @@ protected CompletableFuture<Void> doCloseProducerAsync(Producer<byte[]> producer
});
}

protected abstract void beforeTerminate();

public CompletableFuture<Void> terminate() {
if (!tryChangeStatusToTerminating()) {
log.info("[{}] Skip current termination since other thread is doing termination, state : {}", replicatorId,
state);
return CompletableFuture.completedFuture(null);
}
beforeTerminate();
return doCloseProducerAsync(producer, () -> {
STATE_UPDATER.set(this, State.Terminated);
this.producer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface Replicator {

CompletableFuture<Void> terminate();

CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer);
CompletableFuture<Void> disconnect();

void updateRates();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,9 @@ public long getNumberOfEntriesInBacklog() {
protected void disableReplicatorRead() {
// No-op
}

@Override
protected void beforeTerminate() {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected CompletableFuture<Void> prepareCreateProducer() {
}

@Override
protected boolean replicateEntries(List<Entry> entries) {
protected boolean replicateEntries(List<Entry> entries, final InFlightTask inFlightTask) {
boolean atLeastOneMessageSentForReplication = false;
boolean isEnableReplicatedSubscriptions =
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
Expand All @@ -90,12 +90,13 @@ protected boolean replicateEntries(List<Entry> entries) {
// This flag is set to true when we skip at least one local message,
// in order to skip remaining local messages.
boolean isLocalMessageSkippedOnce = false;
boolean skipRemainingMessages = false;
boolean skipRemainingMessages = inFlightTask.isSkipReadResultDueToCursorRewound();
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
// Skip the messages since the replicator need to fetch the schema info to replicate the schema to the
// remote cluster. Rewind the cursor first and continue the message read after fetched the schema.
if (skipRemainingMessages) {
inFlightTask.incCompletedEntries();
entry.release();
continue;
}
Expand All @@ -108,12 +109,14 @@ protected boolean replicateEntries(List<Entry> entries) {
log.error("[{}] Failed to deserialize message at {} (buffer size: {}): {}", replicatorId,
entry.getPosition(), length, t.getMessage(), t);
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
inFlightTask.incCompletedEntries();
entry.release();
continue;
}

if (Markers.isTxnMarker(msg.getMessageBuilder())) {
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
Expand All @@ -123,6 +126,7 @@ protected boolean replicateEntries(List<Entry> entries) {
msg.getMessageBuilder().getTxnidLeastBits());
if (topic.isTxnAborted(tx, entry.getPosition())) {
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
Expand All @@ -136,6 +140,7 @@ protected boolean replicateEntries(List<Entry> entries) {
if (msg.isReplicated()) {
// Discard messages that were already replicated into this region
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
Expand All @@ -147,6 +152,7 @@ protected boolean replicateEntries(List<Entry> entries) {
entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
Expand All @@ -159,19 +165,21 @@ protected boolean replicateEntries(List<Entry> entries) {
replicatorId, entry.getPosition(), msg.getReplicateTo());
}
cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
}

if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) {
// The producer is not ready yet after having stopped/restarted. Drop the message because it will
// recovered when the producer is ready
// recover when the producer is ready
if (log.isDebugEnabled()) {
log.debug("[{}] Dropping read message at {} because producer is not ready",
replicatorId, entry.getPosition());
}
isLocalMessageSkippedOnce = true;
inFlightTask.incCompletedEntries();
entry.release();
msg.recycle();
continue;
Expand All @@ -184,24 +192,31 @@ protected boolean replicateEntries(List<Entry> entries) {

CompletableFuture<SchemaInfo> schemaFuture = getSchemaInfo(msg);
if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) {
/**
* Skip in flight reading tasks.
* Explain the result of the race-condition between:
* - {@link #readMoreEntries}
* - {@link #beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding)}
* Since {@link #acquirePermitsIfNotFetchingSchema} and
* {@link #beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding)} acquire the
* same lock, it is safe.
*/
beforeTerminateOrCursorRewinding(ReasonOfWaitForCursorRewinding.Fetching_Schema);
inFlightTask.incCompletedEntries();
entry.release();
headersAndPayload.release();
msg.recycle();
// Mark the replicator is fetching the schema for now and rewind the cursor
// and trigger the next read after complete the schema fetching.
fetchSchemaInProgress = true;
skipRemainingMessages = true;
cursor.cancelPendingReadRequest();
log.info("[{}] Pause the data replication due to new detected schema", replicatorId);
schemaFuture.whenComplete((__, e) -> {
if (e != null) {
log.warn("[{}] Failed to get schema from local cluster, will try in the next loop",
replicatorId, e);
}
log.info("[{}] Resume the data replication after the schema fetching done", replicatorId);
cursor.rewind();
fetchSchemaInProgress = false;
readMoreEntries();
doRewindCursor(true);
});
} else {
msg.setSchemaInfoForReplicator(schemaFuture.get());
Expand All @@ -214,11 +229,10 @@ protected boolean replicateEntries(List<Entry> entries) {
stats.incrementMsgOutCounter();
stats.incrementBytesOutCounter(headersAndPayload.readableBytes());
// Increment pending messages for messages produced locally
PENDING_MESSAGES_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] Publishing {}:{}", replicatorId, entry.getLedgerId(), entry.getEntryId());
}
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg));
producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg, inFlightTask));
atLeastOneMessageSentForReplication = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ public enum MessageDupStatus {
}

public static class MessageDupUnknownException extends RuntimeException {
public MessageDupUnknownException() {
super("Cannot determine whether the message is a duplicate at this time");
public MessageDupUnknownException(String topicName, String producerName) {
super(String.format("[%s][%s]Cannot determine whether the message is a duplicate at this time", topicName,
producerName));
}
}

Expand Down
Loading
Loading