Merge "Bug-2692:Avoid fake snapshot during initiate snapshot"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 8b47e8e..666cea6 100644 (file)
@@ -87,6 +87,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     leader.handleMessage(senderActor, new SendHeartBeat());
 
                     final String out =
@@ -134,6 +137,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, null,
                             new MockRaftActorContext.MockReplicatedLogEntry(1,
@@ -271,6 +277,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             leader.getFollowerToSnapshot().getNextChunk();
             leader.getFollowerToSnapshot().incrementChunkIndex();
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
@@ -345,6 +354,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             //update follower timestamp
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
             RaftActorBehavior raftBehavior = leader.handleMessage(
                 senderActor, new Replicate(null, "state-id", entry));
@@ -583,9 +595,102 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(snapshotIndex + 1, fli.getNextIndex());
         }};
     }
+    @Test
+    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            };
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            actorContext.setConfigParams(configParams);
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(1, objectList.size());
+
+            Object o = objectList.get(0);
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(2, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(3, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+            // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            // Count should still stay at 3
+            assertEquals(3, objectList.size());
+        }};
+    }
+
 
     @Test
-    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
         new JavaTestKit(getSystem()) {{
 
             TestActorRef<MessageCollectorActor> followerActor =
@@ -639,11 +744,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(3, installSnapshot.getTotalChunks());
 
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                followerActor.path().toString(), -1, false));
+
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+            o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
 
             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
 
@@ -662,7 +771,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             {
 
                 TestActorRef<MessageCollectorActor> followerActor =
-                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
 
                 Map<String, String> peerAddresses = new HashMap<>();
                 peerAddresses.put(followerActor.path().toString(),
@@ -716,10 +825,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
 
-                leader.handleMessage(leaderActor, new SendHeartBeat());
-
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
 
                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
@@ -881,6 +990,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -949,6 +1061,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -1177,6 +1291,85 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+
+    @Test
+    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+            followerActorContext.setConfigParams(configParams);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            //create 3 entries
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive("follower-reply");
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+                .getFirstMatching(followerActor, AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+            List<Object> entries = ForwardMessageToBehaviorActor
+                .getAllMatching(followerActor, AppendEntries.class);
+
+            assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+            AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+            assertEquals(1, appendEntriesSecond.getLeaderCommit());
+            assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+            assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+        }};
+    }
+
     class MockLeader extends Leader {
 
         FollowerToSnapshot fts;

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.