+ ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+ ApplySnapshot.class);
+ Snapshot snapshot = applySnapshot.getSnapshot();
+ assertNotNull(lastInstallSnapshot);
+ assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
+ assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
+ snapshot.getLastAppliedTerm());
+ assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
+ snapshot.getLastAppliedIndex());
+ assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
+ assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
+ Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
+ assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+ assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
+ applySnapshot.getCallback().onSuccess();
+
+ List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
+ leaderActor, InstallSnapshotReply.class);
+ assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
+
+ chunkIndex = 1;
+ for (InstallSnapshotReply reply: replies) {
+ assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
+ assertEquals("getTerm", 1, reply.getTerm());
+ assertEquals("isSuccess", true, reply.isSuccess());
+ assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
+ }
+
+ assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
+ }
+
+ /**
+ * Verify that when an AppendEntries is sent to a follower during a snapshot install
+ * the Follower short-circuits the processing of the AppendEntries message.
+ */
+ @Test
+ public void testReceivingAppendEntriesDuringInstallSnapshot() {
+ logStart("testReceivingAppendEntriesDuringInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+
+ // Check that snapshot installation is not in progress
+ assertNull(follower.getSnapshotTracker());
+
+ // Make sure that we have more than 1 chunk to send
+ assertTrue(totalChunks > 1);
+
+ // Send an install snapshot with the first chunk to start the process of installing a snapshot
+ byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, 1, totalChunks));
+
+ // Check if snapshot installation is in progress now
+ assertNotNull(follower.getSnapshotTracker());
+
+ // Send an append entry
+ AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
+ Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
+
+ follower.handleMessage(leaderActor, appendEntries);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals("isSuccess", true, reply.isSuccess());
+ assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+ assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+ assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+ assertNotNull(follower.getSnapshotTracker());
+ }
+
+ @Test
+ public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
+ logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+
+ // Check that snapshot installation is not in progress
+ assertNull(follower.getSnapshotTracker());
+
+ // Make sure that we have more than 1 chunk to send
+ assertTrue(totalChunks > 1);
+
+ // Send an install snapshot with the first chunk to start the process of installing a snapshot
+ byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+ follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, 1, totalChunks));
+
+ // Check if snapshot installation is in progress now
+ assertNotNull(follower.getSnapshotTracker());
+
+ // Send appendEntries with a new term and leader.
+ AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
+ Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
+
+ follower.handleMessage(leaderActor, appendEntries);
+
+ AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+ assertEquals("isSuccess", true, reply.isSuccess());
+ assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
+ assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
+ assertEquals("getTerm", 2, reply.getTerm());
+
+ assertNull(follower.getSnapshotTracker());
+ }
+
+ @Test
+ public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
+ logStart("testInitialSyncUpWithHandleInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+ context.setCommitIndex(-1);
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+ int offset = 0;
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+ int chunkIndex = 1;
+ InstallSnapshot lastInstallSnapshot = null;
+
+ for (int i = 0; i < totalChunks; i++) {
+ byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+ lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, chunkIndex, totalChunks);
+ follower.handleMessage(leaderActor, lastInstallSnapshot);
+ offset = offset + 50;
+ lastIncludedIndex++;
+ chunkIndex++;
+ }
+
+ FollowerInitialSyncUpStatus syncStatus =
+ MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+ assertFalse(syncStatus.isInitialSyncDone());
+
+ // Clear all the messages
+ MessageCollectorActor.clearMessages(followerActor);
+
+ context.setLastApplied(101);
+ context.setCommitIndex(101);
+ setLastLogEntry(context, 1, 101,
+ new MockRaftActorContext.MockPayload(""));
+
+ List<ReplicatedLogEntry> entries = Arrays.asList(
+ newReplicatedLogEntry(2, 101, "foo"));
+
+ // The new commitIndex is 101
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
+ follower.handleMessage(leaderActor, appendEntries);
+
+ syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+ assertTrue(syncStatus.isInitialSyncDone());
+ }
+
+ @Test
+ public void testHandleOutOfSequenceInstallSnapshot() {
+ logStart("testHandleOutOfSequenceInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+
+ InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
+ getNextChunk(bsSnapshot, 10, 50), 3, 3);
+ follower.handleMessage(leaderActor, installSnapshot);
+
+ InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
+ InstallSnapshotReply.class);
+
+ assertEquals("isSuccess", false, reply.isSuccess());
+ assertEquals("getChunkIndex", -1, reply.getChunkIndex());
+ assertEquals("getTerm", 1, reply.getTerm());
+ assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
+
+ assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
+ }
+
+ @Test
+ public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
+ MockRaftActorContext context = createActorContext();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ follower = createBehavior(context);
+
+ TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+
+ long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+ assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+
+ RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+ assertTrue("Expected Candidate", newBehavior instanceof Candidate);
+ }
+
+ @Test
+ public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
+ MockRaftActorContext context = createActorContext();
+ context.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getElectionTimeOutInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);