Merge "Do not use ActorSystem.actorFor as it is deprecated"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index c57fce1cd553d8c41dd786d9c4bb6f33e97791b6..383ebefd36685f6298524ef87353281c325d1abd 100644 (file)
@@ -27,7 +27,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
@@ -43,7 +43,6 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
 
 public class LeaderTest extends AbstractLeaderTest {
@@ -120,6 +119,15 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
     }
 
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                1, index, payload);
+        actorContext.getReplicatedLog().append(newEntry);
+        return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
+    }
+
     @Test
     public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
         logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
@@ -146,8 +154,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
                 1, lastIndex + 1, payload);
         actorContext.getReplicatedLog().append(newEntry);
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(null, null, newEntry));
+        RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1);
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -161,6 +168,218 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
     }
 
+    @Test
+    public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+        logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<5;i++) {
+            sendReplicate(actorContext, lastIndex+i+1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect only 1 message to be sent because of two reasons,
+        // - an append entries reply was not received
+        // - the heartbeat interval has not expired
+        // In this scenario if multiple messages are sent they would likely be duplicates
+        assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
+    }
+
+    @Test
+    public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+        logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(5, TimeUnit.SECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<3;i++) {
+            sendReplicate(actorContext, lastIndex+i+1);
+            leader.handleMessage(followerActor, new AppendEntriesReply(
+                    FOLLOWER_ID, term, true, lastIndex + i + 1, term));
+
+        }
+
+        for(int i=3;i<5;i++) {
+            sendReplicate(actorContext, lastIndex + i + 1);
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+        // get sent to the follower - but not the 5th
+        assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
+
+        for(int i=0;i<4;i++) {
+            long expected = allMessages.get(i).getEntries().get(0).getIndex();
+            assertEquals(expected, i+2);
+        }
+    }
+
+    @Test
+    public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+        logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        sendReplicate(actorContext, lastIndex+1);
+
+        // Wait slightly longer than heartbeat duration
+        Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
+
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(1, allMessages.get(0).getEntries().size());
+        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+        assertEquals(1, allMessages.get(1).getEntries().size());
+        assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+
+    }
+
+    @Test
+    public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+        logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        for(int i=0;i<3;i++) {
+            Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+        }
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
+    }
+
+    @Test
+    public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+        logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getHeartBeatInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+        followerActor.underlyingActor().clear();
+
+        Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+        sendReplicate(actorContext, lastIndex+1);
+
+        List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+        assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
+
+        assertEquals(0, allMessages.get(0).getEntries().size());
+        assertEquals(1, allMessages.get(1).getEntries().size());
+    }
+
+
     @Test
     public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
         logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
@@ -265,10 +484,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(leaderActor, new SendHeartBeat());
 
-        InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
-                InstallSnapshot.SERIALIZABLE_CLASS);
-
-        InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+        InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(snapshotIndex, is.getLastIncludedIndex());
     }
@@ -417,8 +633,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         // check if installsnapshot gets called with the correct values.
 
-        InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
-                MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertNotNull(installSnapshot.getData());
         assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
@@ -522,8 +737,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -532,8 +746,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-        installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(2, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -542,16 +755,14 @@ public class LeaderTest extends AbstractLeaderTest {
         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-        installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
         followerActor.underlyingActor().clear();
         leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
                 FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-        installSnapshot = MessageCollectorActor.getFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
 
         Assert.assertNull(installSnapshot);
     }
@@ -592,10 +803,10 @@ public class LeaderTest extends AbstractLeaderTest {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Optional.of(bs));
 
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -610,8 +821,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(leaderActor, new SendHeartBeat());
 
-        installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
@@ -654,12 +864,11 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
 
         int hashCode = installSnapshot.getData().hashCode();
 
@@ -668,12 +877,11 @@ public class LeaderTest extends AbstractLeaderTest {
         leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
                 FOLLOWER_ID, 1, true));
 
-        installSnapshot = MessageCollectorActor.expectFirstMatching(
-                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertEquals(2, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+        assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
     }
 
     @Test
@@ -929,12 +1137,12 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertEquals(2, leaderActorContext.getCommitIndex());
 
-        ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
-                leaderActor, ApplyLogEntries.class);
+        ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
+                leaderActor, ApplyJournalEntries.class);
 
         assertEquals(2, leaderActorContext.getLastApplied());
 
-        assertEquals(2, applyLogEntries.getToIndex());
+        assertEquals(2, applyJournalEntries.getToIndex());
 
         List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
                 ApplyState.class);