Skip to content

ZOOKEEPER-4882: Fix data loss after rejoin and restart of a node experienced temporary disk error #2268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,16 @@ public long loadDataBase() throws IOException {
/**
* Fast-forward the database adding transactions from the committed log into memory.
* @return the last valid zxid.
* @throws IOException
* @throws IOException IO or inconsistent database error
*/
public long fastForwardDataBase() throws IOException {
long lastLoggedZxid = snapLog.getLastLoggedZxid();
if (lastLoggedZxid < dataTree.lastProcessedZxid) {
String msg = String.format("memory database(zxid: 0x%s) is ahead of disk(zxid: 0x%s)",
Long.toHexString(dataTree.lastProcessedZxid),
Long.toHexString(lastLoggedZxid));
throw new IOException(msg);
}
long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ public final synchronized void shutdown(boolean fullyShutDown) {
// This will fast-forward the database to the last recorded transaction
zkDb.fastForwardDataBase();
} catch (IOException e) {
LOG.error("Error updating DB", e);
LOG.error("Failed to update memory database, will clear it to avoid inconsistency", e);
fullyShutDown = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public class FileTxnLog implements TxnLog, Closeable {
}

long lastZxidSeen;
long lastZxidFlushed;
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null;
Expand Down Expand Up @@ -366,7 +367,13 @@ public static File[] getLogFiles(File[] logDirList, long snapshotZxid) {
* get the last zxid that was logged in the transaction logs
* @return the last zxid logged in the transaction logs
*/
public long getLastLoggedZxid() {
@Override
public long getLastLoggedZxid() throws IOException {
long lastFlushedZxid = getLastFlushedZxid();
if (lastFlushedZxid > 0) {
return lastFlushedZxid;
}

File[] files = getLogFiles(logDir.listFiles(), 0);
long maxLog = files.length > 0 ? Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1;

Expand All @@ -381,8 +388,6 @@ public long getLastLoggedZxid() {
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
return zxid;
}
Expand Down Expand Up @@ -427,6 +432,7 @@ public synchronized void commit() throws IOException {
ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS);
}
}
lastZxidFlushed = lastZxidSeen;
while (streamsToFlush.size() > 1) {
streamsToFlush.poll().close();
}
Expand All @@ -442,6 +448,10 @@ public synchronized void commit() throws IOException {
}
}

private synchronized long getLastFlushedZxid() {
return lastZxidFlushed;
}

/**
*
* @return elapsed sync time of transaction log in milliseconds
Expand Down Expand Up @@ -494,8 +504,13 @@ public boolean truncate(long zxid) throws IOException {
while (itr.goToNextLog()) {
if (!itr.logFile.delete()) {
LOG.warn("Unable to truncate {}", itr.logFile);
throw new IOException("Unable to truncate " + itr.logFile);
}
}
synchronized (this) {
lastZxidSeen = zxid;
lastZxidFlushed = zxid;
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,10 @@ public void processTransaction(
* the last logged zxid on the transaction logs
* @return the last logged zxid
*/
public long getLastLoggedZxid() {
FileTxnLog txnLog = new FileTxnLog(dataDir);
return txnLog.getLastLoggedZxid();
public long getLastLoggedZxid() throws IOException {
SnapshotInfo snapshotInfo = snapLog.getLastSnapshotInfo();
long lastSnapZxid = snapshotInfo == null ? -1 : snapshotInfo.zxid;
return Long.max(lastSnapZxid, txnLog.getLastLoggedZxid());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2263,7 +2263,8 @@ public void setZKDatabase(ZKDatabase database) {
this.zkDb = database;
}

protected ZKDatabase getZkDb() {
// @VisibleForTesting
public ZKDatabase getZkDb() {
return zkDb;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public void commit(final Request request) {
private void logEpochsAndLastLoggedTxnForAllServers() throws Exception {
for (int i = 0; i < SERVER_COUNT; i++) {
final QuorumPeer qp = mt[i].getQuorumPeer();
if (qp != null) {
if (qp != null && qp.getZkDb().isInitialized()) {
LOG.info(String.format("server id=%d, acceptedEpoch=%d, currentEpoch=%d, lastLoggedTxn=%s",
qp.getMyId(), qp.getAcceptedEpoch(),
qp.getCurrentEpoch(), Long.toHexString(qp.getLastLoggedZxid())));
Expand Down
Loading