List<ReplicatedLogEntry> unAppliedEntry;
ApplySnapshot applySnapshot;
InstallSnapshot installSnapshot;
- InstallSnapshotReply installSnapshotReply;
testLog.info("testInstallSnapshotToLaggingFollower starting");
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+
// Now stop dropping AppendEntries in follower 2.
follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
+ // the snapshot store because the second snapshot was initiated by the follower install snapshot and
+ // not because the batch count was reached so the persisted journal sequence number wasn't advanced
+ // far enough to cause the previous snapshot to be deleted. This is because
+ // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
+ // This is OK - the next snapshot should delete it. In production, even if the system restarted
+ // before another snapshot, they would both get applied which wouldn't hurt anything.
+ persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
+ Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
+ Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
+ verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
+ unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
+ assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
+
+ int snapshotSize = persistedSnapshot.getState().length;
+ int expTotalChunks = (snapshotSize / SNAPSHOT_CHUNK_SIZE) + ((snapshotSize % SNAPSHOT_CHUNK_SIZE) > 0 ? 1 : 0);
+
installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, InstallSnapshot.class);
assertEquals("InstallSnapshot getTerm", currentTerm, installSnapshot.getTerm());
assertEquals("InstallSnapshot getLeaderId", leaderId, installSnapshot.getLeaderId());
assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
- assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
+ assertEquals("InstallSnapshot getTotalChunks", expTotalChunks, installSnapshot.getTotalChunks());
assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
assertEquals("InstallSnapshot getLastIncludedIndex", lastAppliedIndex, installSnapshot.getLastIncludedIndex());
//assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
- installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
- assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
- assertEquals("InstallSnapshotReply getChunkIndex", 1, installSnapshotReply.getChunkIndex());
- assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
- assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+ List<InstallSnapshotReply> installSnapshotReplies = MessageCollectorActor.expectMatching(
+ leaderCollectorActor, InstallSnapshotReply.class, expTotalChunks);
+ int index = 1;
+ for(InstallSnapshotReply installSnapshotReply: installSnapshotReplies) {
+ assertEquals("InstallSnapshotReply getTerm", currentTerm, installSnapshotReply.getTerm());
+ assertEquals("InstallSnapshotReply getChunkIndex", index++, installSnapshotReply.getChunkIndex());
+ assertEquals("InstallSnapshotReply getFollowerId", follower2Id, installSnapshotReply.getFollowerId());
+ assertEquals("InstallSnapshotReply isSuccess", true, installSnapshotReply.isSuccess());
+ }
// Verify follower 2 applies the snapshot.
applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
// the log. In addition replicatedToAllIndex should've advanced.
verifyLeadersTrimmedLog(lastAppliedIndex);
- // Verify the leader's persisted snapshot. The previous snapshot (currently) won't be deleted from
- // the snapshot store because the second snapshot was initiated by the follower install snapshot and
- // not because the batch count was reached so the persisted journal sequence number wasn't advanced
- // far enough to cause the previous snapshot to be deleted. This is because
- // RaftActor#trimPersistentData subtracts the snapshotBatchCount from the snapshot's sequence number.
- // This is OK - the next snapshot should delete it. In production, even if the system restarted
- // before another snapshot, they would both get applied which wouldn't hurt anything.
- persistedSnapshots = InMemorySnapshotStore.getSnapshots(leaderId, Snapshot.class);
- Assert.assertTrue("Expected at least 1 persisted snapshots", persistedSnapshots.size() > 0);
- Snapshot persistedSnapshot = persistedSnapshots.get(persistedSnapshots.size() - 1);
- verifySnapshot("Persisted", persistedSnapshot, currentTerm, lastAppliedIndex, currentTerm, lastAppliedIndex);
- unAppliedEntry = persistedSnapshot.getUnAppliedEntries();
- assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
-
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
MessageCollectorActor.clearMessages(follower2CollectorActor);