Optimized AppendEntries sending to follower by adding AppendEntry call at end of...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index b31cb621b3576b1a9bcbaff321465d4bd186674e..63f94828eb4752ed6fc9367e9575f98d4bdc6d76 100644 (file)
@@ -86,6 +86,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     leader.handleMessage(senderActor, new SendHeartBeat());
 
                     final String out =
@@ -133,6 +136,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, null,
                             new MockRaftActorContext.MockReplicatedLogEntry(1,
@@ -270,6 +276,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             leader.getFollowerToSnapshot().getNextChunk();
             leader.getFollowerToSnapshot().incrementChunkIndex();
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
@@ -344,6 +353,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             //update follower timestamp
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
             RaftActorBehavior raftBehavior = leader.handleMessage(
                 senderActor, new Replicate(null, "state-id", entry));
@@ -578,7 +590,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
         new JavaTestKit(getSystem()) {{
 
             TestActorRef<MessageCollectorActor> followerActor =
@@ -632,11 +644,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(3, installSnapshot.getTotalChunks());
 
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                followerActor.path().toString(), -1, false));
+
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+            o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
 
             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
 
@@ -655,7 +671,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             {
 
                 TestActorRef<MessageCollectorActor> followerActor =
-                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
 
                 Map<String, String> peerAddresses = new HashMap<>();
                 peerAddresses.put(followerActor.path().toString(),
@@ -709,10 +725,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
 
-                leader.handleMessage(leaderActor, new SendHeartBeat());
-
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
 
                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
@@ -874,6 +890,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -942,6 +961,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -1170,6 +1191,85 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+
+    @Test
+    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+            followerActorContext.setConfigParams(configParams);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            //create 3 entries
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive("follower-reply");
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+                .getFirstMatching(followerActor, AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+            List<Object> entries = ForwardMessageToBehaviorActor
+                .getAllMatching(followerActor, AppendEntries.class);
+
+            assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+            AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+            assertEquals(1, appendEntriesSecond.getLeaderCommit());
+            assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+            assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+        }};
+    }
+
     class MockLeader extends Leader {
 
         FollowerToSnapshot fts;