Skip to content

Commit

Permalink
Fix the following issues in Learner.syncWithLeader():
Browse files Browse the repository at this point in the history
ZOOKEEPER-4394: NullPointerException when follower receives COMMIT after replying NEWLEADER ack
ZOOKEEPER-4643: Committed txns lost when follower crashes after updating currentEpoch
ZOOKEEPER-4646: Committed txns lost when follower crashes after replying NEWLEADER ack
ZOOKEEPER-4685: Leader shutdown due to follower replies PROPOSAL ack before NEWLEADER ack in Synchronization phase
ZOOKEEPER-3023: Flaky tests: Zab1_0Test#testNormalFollowerRunWithDiff
  • Loading branch information
AlphaCanisMajoris committed Mar 31, 2024
1 parent d12aba5 commit cdd13ca
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {
* @param hdr the txn header
* @param txn the txn
* @param digest the digest of txn
* @return a request moving through a chain of RequestProcessors
*/
public Request appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
final Request request = buildRequestToProcess(hdr, txn, digest);
public void appendRequest(final TxnHeader hdr, final Record txn, final TxnDigest digest) throws IOException {
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
getZKDatabase().append(request);
return request;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
Deque<Request> requestsToAck = new ArrayDeque<>();

synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
Expand Down Expand Up @@ -753,29 +752,60 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
isPreZAB1_0 = false;

// ZOOKEEPER-3911: make sure sync the uncommitted logs before commit them (ACK NEWLEADER).
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();
if (zk instanceof FollowerZooKeeperServer) {
if (zk instanceof FollowerZooKeeperServer && !packetsCommitted.isEmpty()) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
final Request request = fzk.appendRequest(p.hdr, p.rec, p.digest);
requestsToAck.add(request);

/*
* @see https://github.com/apache/zookeeper/pull/1848
* Persist and process the committed txns in "packetsNotCommitted"
* according to "packetsCommitted", which have been committed by
* the leader. For these committed proposals, there is no need to
* reply ack.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394
* Keep the outstanding proposals in "packetsNotCommitted" to avoid
* NullPointerException when the follower receives COMMIT packet(s)
* right after replying NEWLEADER ack.
*/
while (!packetsCommitted.isEmpty()) {
long zxid = packetsCommitted.removeFirst();
pif = packetsNotCommitted.peekFirst();
if (pif == null) {
LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid));
continue;
} else if (pif.hdr.getZxid() != zxid) {
LOG.warn("Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid()));
continue;
}
packetsNotCommitted.removeFirst();
fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
fzk.processTxn(pif.hdr, pif.rec);
}

// persist the txns to disk
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4646
// Make sure to persist the txns to disk before replying NEWLEADER ack.
fzk.getZKDatabase().commit();
LOG.info("{} txns have been persisted and it took {}ms",
packetsNotCommitted.size(), Time.currentElapsedTime() - startTime);
packetsNotCommitted.clear();
LOG.info("It took {}ms to persist and commit txns in packetsCommitted. "
+ "{} outstanding txns left in packetsNotCommitted",
Time.currentElapsedTime() - startTime, packetsNotCommitted.size());
}

// set the current epoch after all the tnxs are persisted
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643
// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4785
// Update current epoch after the committed txns are persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);

// send NEWLEADER ack after all the tnxs are persisted
// Now we almost complete the synchronization phase. Start RequestProcessors
// to asynchronously process the pending txns in "packetsNotCommitted" and
// "packetsCommitted" later.
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
zk.startupWithoutServing();

// send NEWLEADER ack after the committed txns are persisted
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
LOG.info("Sent NEWLEADER ack to leader with zxid {}", Long.toHexString(newLeaderZxid));
break;
Expand All @@ -796,15 +826,6 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {

// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
// reply ACK of PROPOSAL after ACK of NEWLEADER to avoid leader shutdown due to timeout
// on waiting for a quorum of followers
for (final Request request : requestsToAck) {
final QuorumPacket ackPacket = new QuorumPacket(Leader.ACK, request.getHdr().getZxid(), null, null);
writePacket(ackPacket, false);
}
writePacket(null, true);
requestsToAck.clear();

FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec, p.digest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,8 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)

readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACKEPOCH, qp.getType());
assertEquals(0, qp.getZxid());
assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(ZxidUtils.makeZxid(0, 0), qp.getZxid());
assertEquals(0, ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());

Expand All @@ -765,36 +765,22 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)
qp.setZxid(0);
oa.writeRecord(qp, null);

// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());

//Wait for the transactions to be written out. The thread that writes them out
// does not send anything back when it is done.
long start = System.currentTimeMillis();
while (createSessionZxid != f.fzk.getLastProcessedZxid()
&& (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
}

assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());

// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
start = System.currentTimeMillis();
zkDb2.loadDataBase();
while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
zkDb2.loadDataBase();
}
LOG.info("zkdb2 sessions:{}", zkDb2.getSessions());
LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts());
assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
Expand All @@ -820,6 +806,140 @@ private void proposeNewSession(QuorumPacket qp, long zxid, long sessionId) throw
}, testData);
}

@Test
public void testNormalFollowerRun_ProcessCommitInSyncAfterAckNewLeader(@TempDir File testData) throws Exception {
testFollowerConversation(new FollowerConversation() {
@Override
public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception {
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
File logDir = f.fzk.getTxnLogFactory().getDataLogDir().getParentFile();
File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
//Spy on ZK so we can check if a snapshot happened or not.
f.zk = spy(f.zk);
try {
assertEquals(0, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());

// Setup a database with a single /foo node
ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
final long firstZxid = ZxidUtils.makeZxid(1, 1);
zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
Stat stat = new Stat();
assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));

QuorumPacket qp = new QuorumPacket();
readPacketSkippingPing(ia, qp);
assertEquals(Leader.FOLLOWERINFO, qp.getType());
assertEquals(qp.getZxid(), 0);
LearnerInfo learnInfo = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
assertEquals(learnInfo.getProtocolVersion(), 0x10000);
assertEquals(learnInfo.getServerid(), 0);

// We are simulating an established leader, so the epoch is 1
qp.setType(Leader.LEADERINFO);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
byte[] protoBytes = new byte[4];
ByteBuffer.wrap(protoBytes).putInt(0x10000);
qp.setData(protoBytes);
oa.writeRecord(qp, null);

readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACKEPOCH, qp.getType());
assertEquals(0, qp.getZxid());
assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());

// Send the snapshot we created earlier
qp.setType(Leader.SNAP);
qp.setData(new byte[0]);
qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
oa.writeRecord(qp, null);
zkDb.serializeSnapshot(oa);
oa.writeString("BenWasHere", null);
Thread.sleep(10); //Give it some time to process the snap
//No Snapshot taken yet, the SNAP was applied in memory
verify(f.zk, never()).takeSnapshot();

// Leader sends an outstanding proposal
long proposalZxid = ZxidUtils.makeZxid(1, 1001);
proposeSetData(qp, proposalZxid, "data2", 2);
oa.writeRecord(qp, null);

qp.setType(Leader.NEWLEADER);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
oa.writeRecord(qp, null);

// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());
//Make sure that we did take the snapshot now
verify(f.zk).takeSnapshot(true);
assertEquals(firstZxid, f.fzk.getLastProcessedZxid());

// The outstanding proposal has not been persisted yet
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
long lastZxid = zkDb2.loadDataBase();
assertEquals("data1", new String(zkDb2.getData("/foo", stat, null)));
assertEquals(firstZxid, lastZxid);

TrackerWatcher watcher = new TrackerWatcher();

// The change should not have happened yet
assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));

// Leader commits proposalZxid right after it sends NEWLEADER to follower
qp.setType(Leader.COMMIT);
qp.setZxid(proposalZxid);
oa.writeRecord(qp, null);

qp.setType(Leader.UPTODATE);
qp.setZxid(0);
oa.writeRecord(qp, null);

// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());

readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(proposalZxid, qp.getZxid());

// The change should happen now
watcher.waitForChange();
assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null)));

// check and make sure the change is persisted
zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
lastZxid = zkDb2.loadDataBase();
assertEquals("data2", new String(zkDb2.getData("/foo", stat, null)));
assertEquals(proposalZxid, lastZxid);
} finally {
TestUtils.deleteFileRecursively(tmpDir);
}
}

private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException {
qp.setType(Leader.PROPOSAL);
qp.setZxid(zxid);
TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeRecord(hdr, null);
boa.writeRecord(sdt, null);
qp.setData(baos.toByteArray());
}
}, testData);
}

@Test
public void testNormalRun(@TempDir File testData) throws Exception {
testLeaderConversation(new LeaderConversation() {
Expand Down

0 comments on commit cdd13ca

Please sign in to comment.