+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
+
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
+ }
+
+ @Test
+ public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+ logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setCommitIndex(-1);
+ actorContext.setLastApplied(-1);
+
+ // The raft context is initialized with a couple log entries. However the commitIndex
+ // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
+ // committed and applied. Now it regains leadership with a higher term (2).
+ long prevTerm = actorContext.getTermInformation().getCurrentTerm();
+ long newTerm = prevTerm + 1;
+ actorContext.getTermInformation().update(newTerm, "");
+
+ leader = new Leader(actorContext);
+ actorContext.setCurrentBehavior(leader);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower replies with the leader's current last index and term, simulating that it is
+ // up to date with the leader.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
+
+ // The commit index should not get updated even though consensus was reached. This is b/c the
+ // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
+ // from previous terms by counting replicas".
+ assertEquals("Commit Index", -1, actorContext.getCommitIndex());
+
+ followerActor.underlyingActor().clear();
+
+ // Now replicate a new entry with the new term 2.
+ long newIndex = lastIndex + 1;
+ sendReplicate(actorContext, newTerm, newIndex);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+
+ // The follower replies with success. The leader should now update the commit index to the new index
+ // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
+ // prior entries are committed indirectly".
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
+
+ assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
+ }
+
+ @Test
+ public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+ logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setRaftPolicy(createRaftPolicy(true, true));
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);