Merge "Bring features/neutron into the same parent dir as everything else"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 0fc0b4ccfde37a802322e8d023c759f0e58ffd7e..8251c6b265bd5ca85e247119104b4e6b268be9f6 100644 (file)
@@ -1,11 +1,16 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -38,18 +43,14 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
 import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
-    private ActorRef leaderActor =
+    private final ActorRef leaderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
-    private ActorRef senderActor =
+    private final ActorRef senderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
 
     @Test
@@ -65,46 +66,53 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
-
     @Test
     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         new JavaTestKit(getSystem()) {{
-
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
-
                     ActorRef followerActor = getTestActor();
 
                     MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
                     Map<String, String> peerAddresses = new HashMap<>();
 
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+                    String followerId = "follower";
+                    peerAddresses.put(followerId, followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
+                    long term = 1;
+                    actorContext.getTermInformation().update(term, "");
+
                     Leader leader = new Leader(actorContext);
-                    leader.handleMessage(senderActor, new SendHeartBeat());
 
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            protected String match(Object in) {
-                                Object msg = fromSerializableMessage(in);
-                                if (msg instanceof AppendEntries) {
-                                    if (((AppendEntries)msg).getTerm() == 0) {
-                                        return "match";
-                                    }
-                                    return null;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
+                    // Leader should send an immediate heartbeat with no entries as follower is inactive.
+                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
+                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+                    assertEquals("getTerm", term, appendEntries.getTerm());
+                    assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+                    assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+                    assertEquals("Entries size", 0, appendEntries.getEntries().size());
 
-                    assertEquals("match", out);
+                    // The follower would normally reply - simulate that explicitly here.
+                    leader.handleMessage(followerActor, new AppendEntriesReply(
+                            followerId, term, true, lastIndex - 1, term));
+                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+                    // Sleep for the heartbeat interval so AppendEntries is sent.
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+                            getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+                    leader.handleMessage(senderActor, new SendHeartBeat());
 
+                    appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+                    assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
+                    assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
                 }
             };
         }};
@@ -113,50 +121,51 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testHandleReplicateMessageSendAppendEntriesToFollower() {
         new JavaTestKit(getSystem()) {{
-
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
-
                     ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
+                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
                     Map<String, String> peerAddresses = new HashMap<>();
 
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+                    String followerId = "follower";
+                    peerAddresses.put(followerId, followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
+                    long term = 1;
+                    actorContext.getTermInformation().update(term, "");
+
                     Leader leader = new Leader(actorContext);
-                    RaftActorBehavior raftBehavior = leader
-                        .handleMessage(senderActor, new Replicate(null, null,
-                            new MockRaftActorContext.MockReplicatedLogEntry(1,
-                                100,
-                                new MockRaftActorContext.MockPayload("foo"))
-                        ));
+
+                    // Leader will send an immediate heartbeat - ignore it.
+                    expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
+                    // The follower would normally reply - simulate that explicitly here.
+                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
+                    leader.handleMessage(followerActor, new AppendEntriesReply(
+                            followerId, term, true, lastIndex, term));
+                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+
+                    MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+                    MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                            1, lastIndex + 1, payload);
+                    actorContext.getReplicatedLog().append(newEntry);
+                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+                            new Replicate(null, null, newEntry));
 
                     // State should not change
                     assertTrue(raftBehavior instanceof Leader);
 
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            protected String match(Object in) {
-                                Object msg = fromSerializableMessage(in);
-                                if (msg instanceof AppendEntries) {
-                                    if (((AppendEntries)msg).getTerm() == 0) {
-                                        return "match";
-                                    }
-                                    return null;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
-
-                    assertEquals("match", out);
+                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
+                    assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
+                    assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+                    assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
                 }
             };
         }};
@@ -165,8 +174,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testHandleReplicateMessageWhenThereAreNoFollowers() {
         new JavaTestKit(getSystem()) {{
-
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef raftActor = getTestActor();
@@ -193,6 +202,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ExpectMsg<String>(duration("1 seconds"),
                             "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 if (in instanceof ApplyState) {
                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
@@ -264,10 +274,13 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             leader.getFollowerToSnapshot().getNextChunk();
             leader.getFollowerToSnapshot().incrementChunkIndex();
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
-                followerActor, AppendEntries.SERIALIZABLE_CLASS);
+            AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
+                followerActor, AppendEntries.class);
 
             assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
                 "received", aeproto);
@@ -281,9 +294,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leader.handleMessage(senderActor, new SendHeartBeat());
 
-            InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
-                MessageCollectorActor.getFirstMatching(followerActor,
-                    InstallSnapshot.SERIALIZABLE_CLASS);
+            InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
+                InstallSnapshot.SERIALIZABLE_CLASS);
 
             assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
                 isproto);
@@ -338,6 +350,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             //update follower timestamp
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
             RaftActorBehavior raftBehavior = leader.handleMessage(
                 senderActor, new Replicate(null, "state-id", entry));
@@ -416,7 +431,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             RaftActorBehavior raftBehavior = leader.handleMessage(
                 leaderActor, new InitiateInstallSnapshot());
 
-            CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+            CaptureSnapshot cs = MessageCollectorActor.
                 getFirstMatching(leaderActor, CaptureSnapshot.class);
 
             assertNotNull(cs);
@@ -426,6 +441,12 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(1, cs.getLastAppliedTerm());
             assertEquals(4, cs.getLastIndex());
             assertEquals(2, cs.getLastTerm());
+
+            // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+            raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
+            List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+            assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+
         }};
     }
 
@@ -466,6 +487,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             Leader leader = new Leader(actorContext);
 
+            // Ignore initial heartbeat.
+            expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
             // new entry
             ReplicatedLogImplEntry entry =
                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
@@ -480,6 +504,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             final String out =
                 new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                     // do not put code outside this method, will run afterwards
+                    @Override
                     protected String match(Object in) {
                         if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
                             InstallSnapshot is = (InstallSnapshot)
@@ -532,6 +557,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             MockLeader leader = new MockLeader(actorContext);
 
+            // Ignore initial heartbeat.
+            expectMsgClass(duration("5 seconds"), AppendEntries.class);
+
             Map<String, String> leadersSnapshot = new HashMap<>();
             leadersSnapshot.put("1", "A");
             leadersSnapshot.put("2", "B");
@@ -560,16 +588,256 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             assertTrue(raftBehavior instanceof Leader);
 
-            assertEquals(leader.mapFollowerToSnapshot.size(), 0);
-            assertEquals(leader.followerToLog.size(), 1);
-            assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
-            FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex, fli.getMatchIndex().get());
-            assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+            assertEquals(0, leader.followerSnapshotSize());
+            assertEquals(1, leader.followerLogSize());
+            assertNotNull(leader.getFollower(followerActor.path().toString()));
+            FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex + 1, fli.getNextIndex());
+        }};
+    }
+    @Test
+    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            };
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            actorContext.setConfigParams(configParams);
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(1, objectList.size());
+
+            Object o = objectList.get(0);
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(2, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(3, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+            // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            // Count should still stay at 3
+            assertEquals(3, objectList.size());
+        }};
+    }
+
+
+    @Test
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                    followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                    (MockRaftActorContext) createActorContext();
+
+            actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            });
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            MessageCollectorActor.getAllMatching(followerActor,
+                    InstallSnapshotMessages.InstallSnapshot.class);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+            assertNotNull(installSnapshot);
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            followerActor.underlyingActor().clear();
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                followerActor.path().toString(), -1, false));
+
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            installSnapshot = MessageCollectorActor.getFirstMatching(
+                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+            assertNotNull(installSnapshot);
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            followerActor.tell(PoisonPill.getInstance(), getRef());
         }};
     }
 
+    @Test
+    public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                TestActorRef<MessageCollectorActor> followerActor =
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put(followerActor.path().toString(),
+                        followerActor.path().toString());
+
+                final int followersLastIndex = 2;
+                final int snapshotIndex = 3;
+                final int snapshotTerm = 1;
+                final int currentTerm = 2;
+
+                MockRaftActorContext actorContext =
+                        (MockRaftActorContext) createActorContext();
+
+                actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+                    @Override
+                    public int getSnapshotChunkSize() {
+                        return 50;
+                    }
+                });
+                actorContext.setPeerAddresses(peerAddresses);
+                actorContext.setCommitIndex(followersLastIndex);
+
+                MockLeader leader = new MockLeader(actorContext);
+
+                Map<String, String> leadersSnapshot = new HashMap<>();
+                leadersSnapshot.put("1", "A");
+                leadersSnapshot.put("2", "B");
+                leadersSnapshot.put("3", "C");
+
+                // set the snapshot variables in replicatedlog
+                actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+                ByteString bs = toByteString(leadersSnapshot);
+                leader.setSnapshot(Optional.of(bs));
+
+                leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+                InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
+                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+                assertNotNull(installSnapshot);
+
+                assertEquals(1, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+                int hashCode = installSnapshot.getData().hashCode();
+
+                followerActor.underlyingActor().clear();
+
+                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+                installSnapshot = MessageCollectorActor.getFirstMatching(
+                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
+                assertNotNull(installSnapshot);
+
+                assertEquals(2, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+                followerActor.tell(PoisonPill.getInstance(), getRef());
+            }};
+    }
+
     @Test
     public void testFollowerToSnapshotLogic() {
 
@@ -627,8 +895,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return createActorContext(leaderActor);
     }
 
+    @Override
     protected RaftActorContext createActorContext(ActorRef actorRef) {
-        return new MockRaftActorContext("test", getSystem(), actorRef);
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(100000);
+        MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
+        context.setConfigParams(configParams);
+        return context;
     }
 
     private ByteString toByteString(Map<String, String> state) {
@@ -657,43 +931,41 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     }
 
     public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
-        private static AbstractRaftActorBehavior behavior;
-
-        public ForwardMessageToBehaviorActor(){
-
-        }
+        AbstractRaftActorBehavior behavior;
 
         @Override public void onReceive(Object message) throws Exception {
+            if(behavior != null) {
+                behavior.handleMessage(sender(), message);
+            }
+
             super.onReceive(message);
-            behavior.handleMessage(sender(), message);
         }
 
-        public static void setBehavior(AbstractRaftActorBehavior behavior){
-            ForwardMessageToBehaviorActor.behavior = behavior;
+        public static Props props() {
+            return Props.create(ForwardMessageToBehaviorActor.class);
         }
     }
 
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
         new JavaTestKit(getSystem()) {{
-
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+                    Props.create(ForwardMessageToBehaviorActor.class));
 
             MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
 
-            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+                    ForwardMessageToBehaviorActor.props());
 
             MockRaftActorContext followerActorContext =
-                new MockRaftActorContext("follower", getSystem(), followerActor);
+                    new MockRaftActorContext("follower", getSystem(), followerActor);
 
             Follower follower = new Follower(followerActorContext);
-
-            ForwardMessageToBehaviorActor.setBehavior(follower);
+            followerActor.underlyingActor().behavior = follower;
 
             Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+            peerAddresses.put("follower", followerActor.path().toString());
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
@@ -701,7 +973,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             //create 3 entries
             leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             leaderActorContext.setCommitIndex(1);
 
@@ -709,35 +981,29 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             // follower too has the exact same log entries and has the same commit index
             followerActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             followerActorContext.setCommitIndex(1);
 
             Leader leader = new Leader(leaderActorContext);
-            leader.markFollowerActive(followerActor.path().toString());
-
-            leader.handleMessage(leaderActor, new SendHeartBeat());
-
-            AppendEntriesMessages.AppendEntries appendEntries =
-                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
 
+            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(0, appendEntries.getEntries().size());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
-            AppendEntriesReply appendEntriesReply =
-                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
                     leaderActor, AppendEntriesReply.class);
-
             assertNotNull(appendEntriesReply);
 
-            // follower returns its next index
             assertEquals(2, appendEntriesReply.getLogLastIndex());
             assertEquals(1, appendEntriesReply.getLogLastTerm());
 
+            // follower returns its next index
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
         }};
     }
 
@@ -745,67 +1011,83 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
         new JavaTestKit(getSystem()) {{
-
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
+                    Props.create(ForwardMessageToBehaviorActor.class));
 
             MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
 
-            ActorRef followerActor = getSystem().actorOf(
-                Props.create(ForwardMessageToBehaviorActor.class));
+            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+                    ForwardMessageToBehaviorActor.props());
 
             MockRaftActorContext followerActorContext =
-                new MockRaftActorContext("follower", getSystem(), followerActor);
+                    new MockRaftActorContext("follower", getSystem(), followerActor);
 
             Follower follower = new Follower(followerActorContext);
-
-            ForwardMessageToBehaviorActor.setBehavior(follower);
+            followerActor.underlyingActor().behavior = follower;
 
             Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+            peerAddresses.put("follower", followerActor.path().toString());
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
             leaderActorContext.getReplicatedLog().removeFrom(0);
 
             leaderActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             leaderActorContext.setCommitIndex(1);
 
             followerActorContext.getReplicatedLog().removeFrom(0);
 
             followerActorContext.setReplicatedLog(
-                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
             // follower has the same log entries but its commit index > leaders commit index
             followerActorContext.setCommitIndex(2);
 
             Leader leader = new Leader(leaderActorContext);
-            leader.markFollowerActive(followerActor.path().toString());
-
-            leader.handleMessage(leaderActor, new SendHeartBeat());
-
-            AppendEntriesMessages.AppendEntries appendEntries =
-                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
 
+            // Initial heartbeat
+            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(0, appendEntries.getEntries().size());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
-            AppendEntriesReply appendEntriesReply =
-                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
                     leaderActor, AppendEntriesReply.class);
+            assertNotNull(appendEntriesReply);
+
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+            leaderActor.underlyingActor().behavior = leader;
+            leader.handleMessage(followerActor, appendEntriesReply);
+
+            leaderActor.underlyingActor().clear();
+            followerActor.underlyingActor().clear();
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                    TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(0, appendEntries.getEntries().size());
+            assertEquals(2, appendEntries.getPrevLogIndex());
 
+            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
             assertNotNull(appendEntriesReply);
 
             assertEquals(2, appendEntriesReply.getLogLastIndex());
             assertEquals(1, appendEntriesReply.getLogLastTerm());
 
+            assertEquals(1, followerActorContext.getCommitIndex());
         }};
     }
 
@@ -879,8 +1161,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                 assertEquals(2, leaderActorContext.getCommitIndex());
 
                 ApplyLogEntries applyLogEntries =
-                    (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
-                        ApplyLogEntries.class);
+                    MessageCollectorActor.getFirstMatching(leaderActor,
+                    ApplyLogEntries.class);
 
                 assertNotNull(applyLogEntries);
 
@@ -1014,6 +1296,98 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+
+    @Test
+    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
+                    Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
+                    ForwardMessageToBehaviorActor.props());
+
+            MockRaftActorContext followerActorContext =
+                    new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+            followerActorContext.setConfigParams(configParams);
+
+            Follower follower = new Follower(followerActorContext);
+            followerActor.underlyingActor().behavior = follower;
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                    followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+            leaderActorContext.setCommitIndex(-1);
+            leaderActorContext.setLastApplied(-1);
+
+            followerActorContext.getReplicatedLog().removeFrom(0);
+            followerActorContext.setCommitIndex(-1);
+            followerActorContext.setLastApplied(-1);
+
+            Leader leader = new Leader(leaderActorContext);
+
+            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
+                    leaderActor, AppendEntriesReply.class);
+            assertNotNull(appendEntriesReply);
+            System.out.println("appendEntriesReply: "+appendEntriesReply);
+            leader.handleMessage(followerActor, appendEntriesReply);
+
+            // Clear initial heartbeat messages
+
+            leaderActor.underlyingActor().clear();
+            followerActor.underlyingActor().clear();
+
+            // create 3 entries
+            leaderActorContext.setReplicatedLog(
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+            leaderActorContext.setCommitIndex(1);
+            leaderActorContext.setLastApplied(1);
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                    TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+            assertNotNull(appendEntries);
+
+            // Should send first log entry
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(-1, appendEntries.getPrevLogIndex());
+
+            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+            assertNotNull(appendEntriesReply);
+
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+            assertEquals(0, appendEntriesReply.getLogLastIndex());
+
+            followerActor.underlyingActor().clear();
+
+            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
+            assertNotNull(appendEntries);
+
+            // Should send second log entry
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+        }};
+    }
+
     class MockLeader extends Leader {
 
         FollowerToSnapshot fts;
@@ -1028,15 +1402,14 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
             fts = new FollowerToSnapshot(bs);
-            mapFollowerToSnapshot.put(followerId, fts);
-
+            setFollowerSnapshot(followerId, fts);
         }
     }
 
     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
 
-        private long electionTimeOutIntervalMillis;
-        private int snapshotChunkSize;
+        private final long electionTimeOutIntervalMillis;
+        private final int snapshotChunkSize;
 
         public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
             super();