diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java index 2fd67da5063..3ab66e12d75 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java @@ -299,9 +299,16 @@ public long loadDataBase() throws IOException { /** * Fast-forward the database adding transactions from the committed log into memory. * @return the last valid zxid. - * @throws IOException + * @throws IOException IO or inconsistent database error */ public long fastForwardDataBase() throws IOException { + long lastLoggedZxid = snapLog.getLastLoggedZxid(); + if (lastLoggedZxid < dataTree.lastProcessedZxid) { + String msg = String.format("memory database(zxid: 0x%s) is ahead of disk(zxid: 0x%s)", + Long.toHexString(dataTree.lastProcessedZxid), + Long.toHexString(lastLoggedZxid)); + throw new IOException(msg); + } long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; return zxid; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 11ec1fb7413..d5344397968 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -935,7 +935,7 @@ public final synchronized void shutdown(boolean fullyShutDown) { // This will fast-forward the database to the last recorded transaction zkDb.fastForwardDataBase(); } catch (IOException e) { - LOG.error("Error updating DB", e); + LOG.error("Failed to update memory database, will clear it to avoid inconsistency", e); fullyShutDown = true; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java index e14e510c2be..267e17e4cbf 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnLog.java @@ -147,6 +147,7 @@ public class FileTxnLog implements TxnLog, Closeable { } long lastZxidSeen; + long lastZxidFlushed; volatile BufferedOutputStream logStream = null; volatile OutputArchive oa; volatile FileOutputStream fos = null; @@ -366,7 +367,13 @@ public static File[] getLogFiles(File[] logDirList, long snapshotZxid) { * get the last zxid that was logged in the transaction logs * @return the last zxid logged in the transaction logs */ - public long getLastLoggedZxid() { + @Override + public long getLastLoggedZxid() throws IOException { + long lastFlushedZxid = getLastFlushedZxid(); + if (lastFlushedZxid > 0) { + return lastFlushedZxid; + } + File[] files = getLogFiles(logDir.listFiles(), 0); long maxLog = files.length > 0 ? Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1; @@ -381,8 +388,6 @@ public long getLastLoggedZxid() { TxnHeader hdr = itr.getHeader(); zxid = hdr.getZxid(); } - } catch (IOException e) { - LOG.warn("Unexpected exception", e); } return zxid; } @@ -427,6 +432,7 @@ public synchronized void commit() throws IOException { ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS); } } + lastZxidFlushed = lastZxidSeen; while (streamsToFlush.size() > 1) { streamsToFlush.poll().close(); } @@ -442,6 +448,10 @@ public synchronized void commit() throws IOException { } } + private synchronized long getLastFlushedZxid() { + return lastZxidFlushed; + } + /** * * @return elapsed sync time of transaction log in milliseconds @@ -494,8 +504,13 @@ public boolean truncate(long zxid) throws IOException { while (itr.goToNextLog()) { if (!itr.logFile.delete()) { LOG.warn("Unable to truncate {}", itr.logFile); + throw new IOException("Unable to truncate " + itr.logFile); } } + synchronized (this) { + lastZxidSeen = zxid; + lastZxidFlushed = zxid; + } } return true; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 2816826046e..2ec275fb29a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -457,9 +457,10 @@ public void processTransaction( * the last logged zxid on the transaction logs * @return the last logged zxid */ - public long getLastLoggedZxid() { - FileTxnLog txnLog = new FileTxnLog(dataDir); - return txnLog.getLastLoggedZxid(); + public long getLastLoggedZxid() throws IOException { + SnapshotInfo snapshotInfo = snapLog.getLastSnapshotInfo(); + long lastSnapZxid = snapshotInfo == null ? -1 : snapshotInfo.zxid; + return Long.max(lastSnapZxid, txnLog.getLastLoggedZxid()); } /** diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 9a18d59d401..378ae9271a0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -2263,7 +2263,8 @@ public void setZKDatabase(ZKDatabase database) { this.zkDb = database; } - protected ZKDatabase getZkDb() { + // @VisibleForTesting + public ZKDatabase getZkDb() { return zkDb; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java index a9be09f3973..3da9d9ea2c1 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/DIFFSyncTest.java @@ -293,7 +293,7 @@ public void commit(final Request request) { private void logEpochsAndLastLoggedTxnForAllServers() throws Exception { for (int i = 0; i < SERVER_COUNT; i++) { final QuorumPeer qp = mt[i].getQuorumPeer(); - if (qp != null) { + if (qp != null && qp.getZkDb().isInitialized()) { LOG.info(String.format("server id=%d, acceptedEpoch=%d, currentEpoch=%d, lastLoggedTxn=%s", qp.getMyId(), qp.getAcceptedEpoch(), qp.getCurrentEpoch(), Long.toHexString(qp.getLastLoggedZxid()))); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncDiskErrorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncDiskErrorTest.java new file mode 100644 index 00000000000..dcea06a06dd --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncDiskErrorTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.io.IOException; +import java.util.Comparator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.QuorumUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class SyncDiskErrorTest extends ZKTestCase { + private static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT; + + QuorumUtil qu; + + @BeforeEach + public void setUp() { + // write and sync every txn + System.setProperty("zookeeper.maxBatchSize", "1"); + } + + @AfterEach + public void tearDown() { + System.clearProperty("zookeeper.maxBatchSize"); + if (qu != null) { + qu.shutdownAll(); + } + } + + @Test + public void testFollowerRejoinAndRestartAfterTemporaryDiskError() throws Exception { + class Context { + final AtomicLong followerId = new AtomicLong(-1); + final CompletableFuture hang = new CompletableFuture<>(); + final CompletableFuture error = new CompletableFuture<>(); + } + Context context = new Context(); + final int N = 1; + qu = new QuorumUtil(N) { + @Override + protected QuorumPeer newQuorumPeer(PeerStruct ps) throws IOException { + QuorumPeer peer = super.newQuorumPeer(ps); + peer.setZKDatabase(new ZKDatabase(peer.getTxnFactory()) { + @Override + public void commit() throws IOException { + if (peer.follower != null && peer.getMyId() == context.followerId.get()) { + context.hang.complete(null); + context.error.join(); + throw new IOException("temporary disk error"); + } + super.commit(); + } + }); + return peer; + } + }; + qu.startAll(); + + int followerId = (int) qu.getFollowerQuorumPeers().get(0).getMyId(); + String followerConnectString = qu.getConnectionStringForServer(followerId); + + // Connect to leader to avoid connection to faulty node. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: follower disk hang temporarily and error + context.followerId.set(followerId); + + // given: multiple write txn committed meanwhile + for (int i = 1; i < 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + i, + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + zk.create("/foo" + 10, new byte[0], ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT); + context.hang.join(); + context.followerId.set(-1); + context.error.complete(null); + + // given: re-join after disk error + ClientBase.waitForServerUp(followerConnectString, CONNECTION_TIMEOUT); + + // given: follower state is good + try (ZooKeeper followerZk = ClientBase.createZKClient(followerConnectString)) { + followerZk.sync("/"); + for (int i = 1; i <= 10; i++) { + String path = "/foo" + i; + assertNotNull(followerZk.exists(path, false), path + " not found"); + } + } + + // given: more write txns + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/bar" + i, + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + } + + // when: restart follower node + qu.shutdown(followerId); + qu.restart(followerId); + + // then: follower state should still be good too + try (ZooKeeper zk = ClientBase.createZKClient(followerConnectString)) { + for (int i = 1; i <= 10; i++) { + String path = "/bar" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + for (int i = 1; i <= 10; i++) { + String path = "/foo" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + } + } + + @Test + public void testDiffSyncAfterTemporaryDiskErrorAndLeading() throws Exception { + class Context { + final AtomicLong followerId = new AtomicLong(-1); + final CompletableFuture hang = new CompletableFuture<>(); + final CompletableFuture error = new CompletableFuture<>(); + } + Context context = new Context(); + // N >= 2, so we can commit with stale follower and error follower + final int N = 2; + qu = new QuorumUtil(N) { + @Override + protected QuorumPeer newQuorumPeer(PeerStruct ps) throws IOException { + QuorumPeer peer = super.newQuorumPeer(ps); + peer.setZKDatabase(new ZKDatabase(peer.getTxnFactory()) { + @Override + public void commit() throws IOException { + if (peer.follower != null && peer.getMyId() == context.followerId.get()) { + context.hang.complete(null); + context.error.join(); + throw new IOException("temporary disk error"); + } + super.commit(); + } + }); + // Force DIFF sync + peer.getZkDb().setSnapshotSizeFactor(1000000); + return peer; + } + }; + qu.startAll(); + + int[] followerIds = qu.getFollowerQuorumPeers() + .stream() + .sorted(Comparator.comparingLong(QuorumPeer::getMyId).reversed()) + .mapToInt(peer -> (int) peer.getMyId()) + .toArray(); + int followerId = followerIds[0]; + String followerConnectString = qu.getConnectionStringForServer(followerId); + + int staleFollowerId = followerIds[1]; + + // Connect to leader to avoid connection to faulty node. + String leaderConnectString = qu.getConnectString(qu.getLeaderQuorumPeer()); + try (ZooKeeper zk = ClientBase.createZKClient(leaderConnectString)) { + // given: another stale follower + qu.shutdown(staleFollowerId); + + // given: follower disk hang temporarily and error + context.followerId.set(followerId); + + // given: multiple write txn committed meanwhile + for (int i = 1; i < 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/foo" + i, + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + zk.create("/foo" + 10, new byte[0], ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT); + context.hang.join(); + context.followerId.set(-1); + context.error.complete(null); + + // given: re-join after disk error + ClientBase.waitForServerUp(followerConnectString, CONNECTION_TIMEOUT); + + // given: follower state is good + try (ZooKeeper followerZk = ClientBase.createZKClient(followerConnectString)) { + followerZk.sync("/"); + for (int i = 1; i <= 10; i++) { + String path = "/foo" + i; + assertNotNull(followerZk.exists(path, false), path + " not found"); + } + } + + // given: more write txns + for (int i = 1; i <= 10; i++) { + // Creates them asynchronous to mimic concurrent operations. + zk.create( + "/bar" + i, + new byte[0], + ZooDefs.Ids.READ_ACL_UNSAFE, + CreateMode.PERSISTENT, + (rc, path, ctx, name) -> {}, + null); + } + } + + // when: become leader + while (qu.getLeaderServer() != followerId) { + int leaderId = qu.getLeaderServer(); + long syncTimeout = (long) qu.getLeaderQuorumPeer().getTickTime() * qu.getLeaderQuorumPeer().getSyncLimit(); + qu.shutdown(leaderId); + Thread.sleep(syncTimeout); + qu.restart(leaderId); + } + + // and: write some txns + try (ZooKeeper zk = ClientBase.createZKClient(followerConnectString)) { + for (int i = 1; i <= 10; i++) { + zk.create("/foobar" + i, new byte[0], ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } + + // when: DIFF sync to stale follower + qu.restart(staleFollowerId); + + // then: follower state should still be good too + String staleFollowerConnectString = qu.getConnectionStringForServer(staleFollowerId); + try (ZooKeeper zk = ClientBase.createZKClient(staleFollowerConnectString)) { + zk.sync("/"); + for (int i = 1; i <= 10; i++) { + String path = "/bar" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + for (int i = 1; i <= 10; i++) { + String path = "/foo" + i; + assertNotNull(zk.exists(path, false), path + " not found"); + } + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java index 865d82b0c7c..0cf1b5bf164 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java @@ -120,7 +120,7 @@ public QuorumUtil(int n, int syncLimit) throws RuntimeException { for (int i = 1; i <= ALL; ++i) { PeerStruct ps = peers.get(i); LOG.info("Creating QuorumPeer {}; public port {}", i, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); assertEquals(ps.clientPort, ps.peer.getClientPort()); } } catch (Exception e) { @@ -143,6 +143,10 @@ public void enableLocalSession(boolean localSessionEnabled) { this.localSessionEnabled = localSessionEnabled; } + protected QuorumPeer newQuorumPeer(PeerStruct ps) throws IOException { + return new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + } + public void startAll() throws IOException { shutdownAll(); for (int i = 1; i <= ALL; ++i) { @@ -206,7 +210,7 @@ public void startQuorum() throws IOException { public void start(int id) throws IOException { PeerStruct ps = getPeer(id); LOG.info("Creating QuorumPeer {}; public port {}", ps.id, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); if (localSessionEnabled) { ps.peer.enableLocalSessions(true); } @@ -225,7 +229,7 @@ public void restart(int id) throws IOException { public void startThenShutdown(int id) throws IOException { PeerStruct ps = getPeer(id); LOG.info("Creating QuorumPeer {}; public port {}", ps.id, ps.clientPort); - ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit); + ps.peer = newQuorumPeer(ps); if (localSessionEnabled) { ps.peer.enableLocalSessions(true); }