Streamline updating out-of-sync follower
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 9094a656540da3fd422e8a445288339a1ce69c8a..27854f40e957a7a395fcd3e8c0cd5df871b3e00e 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
@@ -973,6 +974,15 @@ public class LeaderTest extends AbstractLeaderTest {
         return context;
     }
 
+    private MockRaftActorContext createFollowerActorContextWithLeader() {
+        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+        DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
+        followerConfig.setElectionTimeoutFactor(10000);
+        followerActorContext.setConfigParams(followerConfig);
+        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+        return followerActorContext;
+    }
+
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
@@ -1101,26 +1111,215 @@ public class LeaderTest extends AbstractLeaderTest {
     }
 
     @Test
-    public void testHandleAppendEntriesReplyFailure(){
-        logStart("testHandleAppendEntriesReplyFailure");
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+        logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        long leaderCommitIndex = 2;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+        followerActorContext.setCommitIndex(1);
+        followerActorContext.setLastApplied(1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
 
         leader = new Leader(leaderActorContext);
 
-        // Send initial heartbeat reply with last index.
-        leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1, (short)0));
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+        // Verify AppendEntries sent with the leader's second log entry.
+        appendEntries = appendEntriesList.get(0);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+        // Verify AppendEntries sent with the leader's third log entry.
+        appendEntries = appendEntriesList.get(1);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
-        assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
+        assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
 
-        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1, (short)0);
+        assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
+    }
 
-        RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+    @Test
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
+        logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
 
-        assertEquals(RaftState.Leader, raftActorBehavior.state());
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+        long leaderCommitIndex = 1;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
-        assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+        // Verify AppendEntries sent with the leader's first log entry.
+        appendEntries = appendEntriesList.get(0);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+        // Verify AppendEntries sent with the leader's second log entry.
+        appendEntries = appendEntriesList.get(1);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+        assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+        logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+        long leaderCommitIndex = 1;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+        // Verify AppendEntries sent with the leader's first log entry.
+        appendEntries = appendEntriesList.get(0);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+        // Verify AppendEntries sent with the leader's third log entry.
+        appendEntries = appendEntriesList.get(1);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+        assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
     }
 
     @Test