Bug 2787: Batch AppendEntries to speed up follower sync
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 27854f40e957a7a395fcd3e8c0cd5df871b3e00e..ccde8bfb226fb4cb3b3dcbaa333139de1eb00524 100644 (file)
@@ -156,11 +156,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         followerActor.underlyingActor().clear();
 
-        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
-        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                1, lastIndex + 1, payload);
-        actorContext.getReplicatedLog().append(newEntry);
-        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
+        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -171,7 +167,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entries size", 1, appendEntries.getEntries().size());
         assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
-        assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
     }
 
     @Test
@@ -1124,12 +1120,15 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
         followerActorContext.setReplicatedLog(
                 new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
-        followerActorContext.setCommitIndex(1);
-        followerActorContext.setLastApplied(1);
+        followerActorContext.setCommitIndex(0);
+        followerActorContext.setLastApplied(0);
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
@@ -1151,26 +1150,37 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
-        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
-        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        // Verify AppendEntries sent with the leader's second log entry.
-        appendEntries = appendEntriesList.get(0);
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
-        // Verify AppendEntries sent with the leader's third log entry.
-        appendEntries = appendEntriesList.get(1);
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersThirdLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
         assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
 
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
         assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
         assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
     }
@@ -1189,6 +1199,9 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setCommitIndex(leaderCommitIndex);
         leaderActorContext.setLastApplied(leaderCommitIndex);
 
+        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
         MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
 
         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
@@ -1215,26 +1228,37 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
-        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
-        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        // Verify AppendEntries sent with the leader's first log entry.
-        appendEntries = appendEntriesList.get(0);
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
-        // Verify AppendEntries sent with the leader's second log entry.
-        appendEntries = appendEntriesList.get(1);
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
 
+        List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+        ApplyState applyState = applyStateList.get(0);
+        assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
+        applyState = applyStateList.get(1);
+        assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+        assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+        assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+                applyState.getReplicatedLogEntry().getData());
+
         assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
         assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
     }
@@ -1283,22 +1307,21 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
-        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
-        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        // Verify AppendEntries sent with the leader's first log entry.
-        appendEntries = appendEntriesList.get(0);
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex());
         assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
-        // Verify AppendEntries sent with the leader's third log entry.
-        appendEntries = appendEntriesList.get(1);
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("Log entries size", 1, appendEntries.getEntries().size());
-        assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
 
         FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
         assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
@@ -1381,6 +1404,88 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(RaftState.Leader, raftActorBehavior.state());
     }
 
+    @Test
+    public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
+        logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2);
+
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
+        long leaderCommitIndex = 3;
+        leaderActorContext.setCommitIndex(leaderCommitIndex);
+        leaderActorContext.setLastApplied(leaderCommitIndex);
+
+        ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+        ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+        ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+        ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+        followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
+
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        leader = new Leader(leaderActorContext);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+        MessageCollectorActor.clearMessages(leaderActor);
+
+        // Verify initial AppendEntries sent with the leader's current commit index.
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+        assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
+
+        leaderActor.underlyingActor().setBehavior(leader);
+
+        leader.handleMessage(followerActor, appendEntriesReply);
+
+        List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+        MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+
+        appendEntries = appendEntriesList.get(0);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersFirstLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        appendEntries = appendEntriesList.get(1);
+        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+        assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("First entry data", leadersThirdLogEntry.getData(),
+                appendEntries.getEntries().get(0).getData());
+        assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
+        assertEquals("Second entry data", leadersFourthLogEntry.getData(),
+                appendEntries.getEntries().get(1).getData());
+
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
+
+        MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
+
+        assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
+        assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
+    }
+
     @Test
     public void testHandleRequestVoteReply(){
         logStart("testHandleRequestVoteReply");
@@ -1463,83 +1568,6 @@ public class LeaderTest extends AbstractLeaderTest {
         }};
     }
 
-
-    @Test
-    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
-        logStart("testAppendEntryCallAtEndofAppendEntryReply");
-
-        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
-
-        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-        //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
-        configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
-
-        leaderActorContext.setConfigParams(configParams);
-
-        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
-
-        followerActorContext.setConfigParams(configParams);
-        followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
-
-        Follower follower = new Follower(followerActorContext);
-        followerActor.underlyingActor().setBehavior(follower);
-
-        leaderActorContext.getReplicatedLog().removeFrom(0);
-        leaderActorContext.setCommitIndex(-1);
-        leaderActorContext.setLastApplied(-1);
-
-        followerActorContext.getReplicatedLog().removeFrom(0);
-        followerActorContext.setCommitIndex(-1);
-        followerActorContext.setLastApplied(-1);
-
-        leader = new Leader(leaderActorContext);
-
-        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
-                leaderActor, AppendEntriesReply.class);
-
-        leader.handleMessage(followerActor, appendEntriesReply);
-
-        // Clear initial heartbeat messages
-
-        leaderActor.underlyingActor().clear();
-        followerActor.underlyingActor().clear();
-
-        // create 3 entries
-        leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-        leaderActorContext.setCommitIndex(1);
-        leaderActorContext.setLastApplied(1);
-
-        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
-
-        leader.handleMessage(leaderActor, new SendHeartBeat());
-
-        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
-        // Should send first log entry
-        assertEquals(1, appendEntries.getLeaderCommit());
-        assertEquals(0, appendEntries.getEntries().get(0).getIndex());
-        assertEquals(-1, appendEntries.getPrevLogIndex());
-
-        appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
-
-        assertEquals(1, appendEntriesReply.getLogLastTerm());
-        assertEquals(0, appendEntriesReply.getLogLastIndex());
-
-        followerActor.underlyingActor().clear();
-
-        leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
-
-        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
-        // Should send second log entry
-        assertEquals(1, appendEntries.getLeaderCommit());
-        assertEquals(1, appendEntries.getEntries().get(0).getIndex());
-
-        follower.close();
-    }
-
     @Test
     public void testLaggingFollowerStarvation() throws Exception {
         logStart("testLaggingFollowerStarvation");