+ */
+ @Test
+ public void testHandleAppendEntriesCorrectReceiverLogEntries() {
+ logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
+
+ MockRaftActorContext context = createActorContext();
+
+ // First set the receivers term to lower number
+ context.getTermInformation().update(1, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(newReplicatedLogEntry(1, 0, "zero"));
+ log.append(newReplicatedLogEntry(1, 1, "one"));
+ log.append(newReplicatedLogEntry(1, 2, "two"));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(newReplicatedLogEntry(2, 2, "two-1"));
+ entries.add(newReplicatedLogEntry(2, 3, "three"));
+
+ // Send appendEntries with the same term as was set on the receiver
+ // before the new behavior was created (1 in this case)
+ // This will not work for a Candidate because as soon as a Candidate
+ // is created it increments the term
+ AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
+
+ follower = createBehavior(context);
+
+ RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+ Assert.assertSame(follower, newBehavior);
+
+ // The entry at index 2 will be found out-of-sync with the leader
+ // and will be removed
+ // Then the two new entries will be added to the log
+ // Thus making the log to have 4 entries
+ assertEquals("Next index", 4, log.last().getIndex() + 1);
+ //assertEquals("Entry 2", entries.get(0), log.get(2));
+
+ assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
+
+ // Check that the entry at index 2 has the new data
+ assertEquals("Entry 2", entries.get(0), log.get(2));
+
+ assertEquals("Entry 3", entries.get(1), log.get(3));
+
+ expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
+ }
+
+ @Test
+ public void testHandleAppendEntriesPreviousLogEntryMissing(){
+ logStart("testHandleAppendEntriesPreviousLogEntryMissing");
+
+ MockRaftActorContext context = createActorContext();
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(newReplicatedLogEntry(1, 0, "zero"));
+ log.append(newReplicatedLogEntry(1, 1, "one"));
+ log.append(newReplicatedLogEntry(1, 2, "two"));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(newReplicatedLogEntry(1, 4, "four"));
+
+ AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
+
+ follower = createBehavior(context);
+
+ RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+ Assert.assertSame(follower, newBehavior);
+
+ expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
+ }
+
+ @Test
+ public void testHandleAppendEntriesWithExistingLogEntry() {
+ logStart("testHandleAppendEntriesWithExistingLogEntry");
+
+ MockRaftActorContext context = createActorContext();
+
+ context.getTermInformation().update(1, "test");
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(newReplicatedLogEntry(1, 0, "zero"));
+ log.append(newReplicatedLogEntry(1, 1, "one"));
+
+ context.setReplicatedLog(log);
+
+ // Send the last entry again.
+ List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
+
+ follower = createBehavior(context);
+
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
+
+ assertEquals("Next index", 2, log.last().getIndex() + 1);
+ assertEquals("Entry 1", entries.get(0), log.get(1));
+
+ expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
+
+ // Send the last entry again and also a new one.
+
+ entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
+
+ leaderActor.underlyingActor().clear();
+ follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
+
+ assertEquals("Next index", 3, log.last().getIndex() + 1);
+ assertEquals("Entry 1", entries.get(0), log.get(1));
+ assertEquals("Entry 2", entries.get(1), log.get(2));
+
+ expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
+ }
+
+ @Test
+ public void testHandleAppendEntriesAfterInstallingSnapshot(){
+ logStart("testHandleAppendAfterInstallingSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+
+ // Set up a log as if it has been snapshotted
+ log.setSnapshotIndex(3);
+ log.setSnapshotTerm(1);
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(newReplicatedLogEntry(1, 4, "four"));
+
+ AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
+
+ follower = createBehavior(context);
+
+ RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+ Assert.assertSame(follower, newBehavior);
+
+ expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
+ }
+
+
+ /**
+ * This test verifies that when InstallSnapshot is received by
+ * the follower its applied correctly.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHandleInstallSnapshot() throws Exception {
+ logStart("testHandleInstallSnapshot");
+
+ MockRaftActorContext context = createActorContext();
+
+ follower = createBehavior(context);
+
+ ByteString bsSnapshot = createSnapshot();
+ int offset = 0;
+ int snapshotLength = bsSnapshot.size();
+ int chunkSize = 50;
+ int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+ int lastIncludedIndex = 1;
+ int chunkIndex = 1;
+ InstallSnapshot lastInstallSnapshot = null;
+
+ for(int i = 0; i < totalChunks; i++) {
+ ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+ lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+ chunkData, chunkIndex, totalChunks);
+ follower.handleMessage(leaderActor, lastInstallSnapshot);
+ offset = offset + 50;
+ lastIncludedIndex++;
+ chunkIndex++;
+ }
+
+ ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+ ApplySnapshot.class);
+ Snapshot snapshot = applySnapshot.getSnapshot();
+ assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
+ assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
+ snapshot.getLastAppliedTerm());
+ assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
+ snapshot.getLastAppliedIndex());
+ assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
+ Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+
+ List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
+ leaderActor, InstallSnapshotReply.class);
+ assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
+
+ chunkIndex = 1;
+ for(InstallSnapshotReply reply: replies) {
+ assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
+ assertEquals("getTerm", 1, reply.getTerm());
+ assertEquals("isSuccess", true, reply.isSuccess());
+ assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
+ }
+
+ assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+ }
+
+
+ /**
+ * Verify that when an AppendEntries is sent to a follower during a snapshot install
+ * the Follower short-circuits the processing of the AppendEntries message.