From 6e44048377d204cfcd7e6069c04a37300265a954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E7=AB=8B=E5=AE=B6?= Date: Fri, 12 Sep 2025 14:34:56 +0800 Subject: [PATCH] [Fix](bdb) Fix feeder handshake error --- .../sleepycat/je/rep/impl/node/Feeder.java | 33 +++++++++++++++++++ .../je/rep/stream/FeederReplicaHandshake.java | 2 +- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/sleepycat/je/rep/impl/node/Feeder.java b/src/main/java/com/sleepycat/je/rep/impl/node/Feeder.java index 3b34f493ce5..07ba5cf24f3 100644 --- a/src/main/java/com/sleepycat/je/rep/impl/node/Feeder.java +++ b/src/main/java/com/sleepycat/je/rep/impl/node/Feeder.java @@ -17,6 +17,7 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.nio.ByteBuffer; import java.nio.channels.Channel; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -1753,4 +1754,36 @@ public void makeSecurityCheckResponse(String err) throw new ReplicationSecurityException(err, replica, null); } + + /** + * Refer to https://github.com/StarRocks/starrocks-bdb-je/pull/17 + * Check whether the channel is available by monitoring the change of lastHeartbeatTime, + * the loop will break when: + * 1. feeder is shutdown + * 2. channel is not open + * 3. lastHeartbeatTime changed + * 4. FEEDER_TIMEOUT reached + */ + public boolean isChannelAvailable() { + long baseTime = this.lastHeartbeatTime; + long startNs = System.nanoTime(); + long timeoutNs = repNode.getConfigManager().getDuration(RepParams.FEEDER_TIMEOUT) * 1000000L; + while (System.nanoTime() - startNs < timeoutNs) { + if (shutdown.get() || !feederReplicaChannel.isOpen()) { + return false; + } + + if (baseTime < lastHeartbeatTime) { + return true; + } + + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + + return false; + } } diff --git a/src/main/java/com/sleepycat/je/rep/stream/FeederReplicaHandshake.java b/src/main/java/com/sleepycat/je/rep/stream/FeederReplicaHandshake.java index aff98ec2113..4ffca8a75d1 100644 --- a/src/main/java/com/sleepycat/je/rep/stream/FeederReplicaHandshake.java +++ b/src/main/java/com/sleepycat/je/rep/stream/FeederReplicaHandshake.java @@ -611,7 +611,7 @@ private Protocol negotiateProtocol() * issue a shutdown to the feeder explicitly. */ if (dup != null && dup.getChannel() != null && - !dup.getChannel().isOpen() && !dup.isShutdown()) { + !dup.isChannelAvailable()) { dup.shutdown(new IOException("Feeder's channel for node " + replicaNameIdPair + " is already closed"));