Merge "Bug-2692:Avoid fake snapshot during initiate snapshot"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 48543d7de2178e937a82d31e03b71282ffdcb88b..666cea69ec891754687b262db72f26d9012fbb0c 100644 (file)
@@ -1,20 +1,35 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -22,17 +37,11 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -40,9 +49,9 @@ import static org.junit.Assert.assertTrue;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
-    private ActorRef leaderActor =
+    private final ActorRef leaderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
-    private ActorRef senderActor =
+    private final ActorRef senderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
 
     @Test
@@ -58,20 +67,19 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
-
     @Test
     public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
+                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
-                    Map<String, String> peerAddresses = new HashMap();
+                    Map<String, String> peerAddresses = new HashMap<>();
 
                     peerAddresses.put(followerActor.path().toString(),
                         followerActor.path().toString());
@@ -79,11 +87,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     leader.handleMessage(senderActor, new SendHeartBeat());
 
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -109,6 +121,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef followerActor = getTestActor();
@@ -116,14 +129,17 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     MockRaftActorContext actorContext =
                         (MockRaftActorContext) createActorContext();
 
-                    Map<String, String> peerAddresses = new HashMap();
+                    Map<String, String> peerAddresses = new HashMap<>();
 
                     peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+                            followerActor.path().toString());
 
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, null,
                             new MockRaftActorContext.MockReplicatedLogEntry(1,
@@ -137,6 +153,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 Object msg = fromSerializableMessage(in);
                                 if (msg instanceof AppendEntries) {
@@ -151,10 +168,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         }.get(); // this extracts the received message
 
                     assertEquals("match", out);
-
                 }
-
-
             };
         }};
     }
@@ -164,6 +178,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     ActorRef raftActor = getTestActor();
@@ -190,6 +205,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         new ExpectMsg<String>(duration("1 seconds"),
                             "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected String match(Object in) {
                                 if (in instanceof ApplyState) {
                                     if (((ApplyState) in).getIdentifier().equals("state-id")) {
@@ -210,230 +226,621 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testSendInstallSnapshot() {
-        new LeaderTestKit(getSystem()) {{
+    public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
 
-            new Within(duration("1 seconds")) {
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
 
-                    Map<String, String> peerAddresses = new HashMap();
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext(leaderActor);
+            actorContext.setPeerAddresses(peerAddresses);
 
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext(getRef());
-                    actorContext.setPeerAddresses(peerAddresses);
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
 
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
 
-                    Map<String, String> leadersSnapshot = new HashMap<>();
-                    leadersSnapshot.put("1", "A");
-                    leadersSnapshot.put("2", "B");
-                    leadersSnapshot.put("3", "C");
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.setCommitIndex(followersLastIndex);
+            //set follower timeout to 2 mins, helps during debugging
+            actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
 
-                    //clears leaders log
-                    actorContext.getReplicatedLog().removeFrom(0);
+            MockLeader leader = new MockLeader(actorContext);
 
-                    final int followersLastIndex = 2;
-                    final int snapshotIndex = 3;
-                    final int newEntryIndex = 4;
-                    final int snapshotTerm = 1;
-                    final int currentTerm = 2;
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
 
-                    // set the snapshot variables in replicatedlog
-                    actorContext.getReplicatedLog().setSnapshot(
-                        toByteString(leadersSnapshot));
-                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            //update follower timestamp
+            leader.markFollowerActive(followerActor.path().toString());
 
-                    MockLeader leader = new MockLeader(actorContext);
-                    // set the follower info in leader
-                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
 
-                    // new entry
-                    ReplicatedLogImplEntry entry =
-                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                            new MockRaftActorContext.MockPayload("D"));
+            //send first chunk and no InstallSnapshotReply received yet
+            leader.getFollowerToSnapshot().getNextChunk();
+            leader.getFollowerToSnapshot().incrementChunkIndex();
 
-                    // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
-                    RaftActorBehavior raftBehavior = leader.handleMessage(
-                        senderActor, new Replicate(null, "state-id", entry));
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
-                    assertTrue(raftBehavior instanceof Leader);
+            leader.handleMessage(leaderActor, new SendHeartBeat());
 
-                    // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
-                    Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
-                        @Override
-                        protected Boolean match(Object o) throws Exception {
-                            if (o instanceof SendInstallSnapshot) {
-                                return true;
-                            }
-                            return false;
-                        }
-                    }.get();
+            AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
+                followerActor, AppendEntries.class);
 
-                    boolean sendInstallSnapshotReceived = false;
-                    for (Boolean b: matches) {
-                        sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
-                    }
+            assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
+                "received", aeproto);
+
+            AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+
+            assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
+
+            //InstallSnapshotReply received
+            leader.getFollowerToSnapshot().markSendStatus(true);
+
+            leader.handleMessage(senderActor, new SendHeartBeat());
+
+            InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
+                MessageCollectorActor.getFirstMatching(followerActor,
+                    InstallSnapshot.SERIALIZABLE_CLASS);
+
+            assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
+                isproto);
+
+            InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+
+            assertEquals(snapshotIndex, is.getLastIncludedIndex());
+
+        }};
+    }
+
+    @Test
+    public void testSendAppendEntriesSnapshotScenario() {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor = getTestActor();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext(getRef());
+            actorContext.setPeerAddresses(peerAddresses);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
 
-                    assertTrue(sendInstallSnapshotReceived);
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.setCommitIndex(followersLastIndex);
 
+            Leader leader = new Leader(actorContext);
+
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
+
+            //update follower timestamp
+            leader.markFollowerActive(followerActor.path().toString());
+
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
+            // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+            RaftActorBehavior raftBehavior = leader.handleMessage(
+                senderActor, new Replicate(null, "state-id", entry));
+
+            assertTrue(raftBehavior instanceof Leader);
+
+            // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
+            Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+                @Override
+                protected Boolean match(Object o) throws Exception {
+                    if (o instanceof InitiateInstallSnapshot) {
+                        return true;
+                    }
+                    return false;
                 }
-            };
+            }.get();
+
+            boolean initiateInitiateInstallSnapshot = false;
+            for (Boolean b: matches) {
+                initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
+            }
+
+            assertTrue(initiateInitiateInstallSnapshot);
         }};
     }
 
     @Test
-    public void testInstallSnapshot() {
-        new LeaderTestKit(getSystem()) {{
+    public void testInitiateInstallSnapshot() throws Exception {
+        new JavaTestKit(getSystem()) {{
 
-            new Within(duration("1 seconds")) {
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
 
-                    Map<String, String> peerAddresses = new HashMap();
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+            ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
-                    actorContext.setPeerAddresses(peerAddresses);
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
 
 
-                    Map<String, String> leadersSnapshot = new HashMap<>();
-                    leadersSnapshot.put("1", "A");
-                    leadersSnapshot.put("2", "B");
-                    leadersSnapshot.put("3", "C");
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext(leaderActor);
+            actorContext.setPeerAddresses(peerAddresses);
 
-                    //clears leaders log
-                    actorContext.getReplicatedLog().removeFrom(0);
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
 
-                    final int followersLastIndex = 2;
-                    final int snapshotIndex = 3;
-                    final int newEntryIndex = 4;
-                    final int snapshotTerm = 1;
-                    final int currentTerm = 2;
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
 
-                    // set the snapshot variables in replicatedlog
-                    actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
-                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
 
-                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.setLastApplied(3);
+            actorContext.setCommitIndex(followersLastIndex);
 
-                    MockLeader leader = new MockLeader(actorContext);
-                    // set the follower info in leader
-                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+            Leader leader = new Leader(actorContext);
+            // set the snapshot as absent and check if capture-snapshot is invoked.
+            leader.setSnapshot(Optional.<ByteString>absent());
 
-                    // new entry
-                    ReplicatedLogImplEntry entry =
-                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                            new MockRaftActorContext.MockPayload("D"));
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
 
-                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor, new SendInstallSnapshot());
+            actorContext.getReplicatedLog().append(entry);
 
-                    assertTrue(raftBehavior instanceof Leader);
+            // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+            RaftActorBehavior raftBehavior = leader.handleMessage(
+                leaderActor, new InitiateInstallSnapshot());
 
-                    // check if installsnapshot gets called with the correct values.
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            protected String match(Object in) {
-                                if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
-                                    InstallSnapshot is = (InstallSnapshot)
-                                        SerializationUtils.fromSerializable(in);
-                                    if (is.getData() == null) {
-                                        return "InstallSnapshot data is null";
-                                    }
-                                    if (is.getLastIncludedIndex() != snapshotIndex) {
-                                        return is.getLastIncludedIndex() + "!=" + snapshotIndex;
-                                    }
-                                    if (is.getLastIncludedTerm() != snapshotTerm) {
-                                        return is.getLastIncludedTerm() + "!=" + snapshotTerm;
-                                    }
-                                    if (is.getTerm() == currentTerm) {
-                                        return is.getTerm() + "!=" + currentTerm;
-                                    }
+            CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+                getFirstMatching(leaderActor, CaptureSnapshot.class);
 
-                                    return "match";
+            assertNotNull(cs);
 
-                               } else {
-                                    return "message mismatch:" + in.getClass();
-                                }
+            assertTrue(cs.isInstallSnapshotInitiated());
+            assertEquals(3, cs.getLastAppliedIndex());
+            assertEquals(1, cs.getLastAppliedTerm());
+            assertEquals(4, cs.getLastIndex());
+            assertEquals(2, cs.getLastTerm());
+
+            // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+            raftBehavior = leader.handleMessage(leaderActor, new InitiateInstallSnapshot());
+            List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+            assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+
+        }};
+    }
+
+    @Test
+    public void testInstallSnapshot() {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor = getTestActor();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            actorContext.setPeerAddresses(peerAddresses);
+
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+            actorContext.setCommitIndex(followersLastIndex);
+
+            Leader leader = new Leader(actorContext);
+
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
+
+            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+                new SendInstallSnapshot(toByteString(leadersSnapshot)));
+
+            assertTrue(raftBehavior instanceof Leader);
+
+            // check if installsnapshot gets called with the correct values.
+            final String out =
+                new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    // do not put code outside this method, will run afterwards
+                    @Override
+                    protected String match(Object in) {
+                        if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+                            InstallSnapshot is = (InstallSnapshot)
+                                SerializationUtils.fromSerializable(in);
+                            if (is.getData() == null) {
+                                return "InstallSnapshot data is null";
+                            }
+                            if (is.getLastIncludedIndex() != snapshotIndex) {
+                                return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+                            }
+                            if (is.getLastIncludedTerm() != snapshotTerm) {
+                                return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+                            }
+                            if (is.getTerm() == currentTerm) {
+                                return is.getTerm() + "!=" + currentTerm;
                             }
-                        }.get(); // this extracts the received message
 
-                    assertEquals("match", out);
+                            return "match";
+
+                        } else {
+                            return "message mismatch:" + in.getClass();
+                        }
+                    }
+                }.get(); // this extracts the received message
+
+            assertEquals("match", out);
+        }};
+    }
+
+    @Test
+    public void testHandleInstallSnapshotReplyLastChunk() {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor = getTestActor();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+            while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+                leader.getFollowerToSnapshot().getNextChunk();
+                leader.getFollowerToSnapshot().incrementChunkIndex();
+            }
+
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
+
+            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+                new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+                    leader.getFollowerToSnapshot().getChunkIndex(), true));
+
+            assertTrue(raftBehavior instanceof Leader);
+
+            assertEquals(0, leader.followerSnapshotSize());
+            assertEquals(1, leader.followerLogSize());
+            assertNotNull(leader.getFollower(followerActor.path().toString()));
+            FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex, fli.getMatchIndex());
+            assertEquals(snapshotIndex + 1, fli.getNextIndex());
+        }};
+    }
+    @Test
+    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
                 }
             };
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            actorContext.setConfigParams(configParams);
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(1, objectList.size());
+
+            Object o = objectList.get(0);
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(2, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(3, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+            // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            // Count should still stay at 3
+            assertEquals(3, objectList.size());
         }};
     }
 
+
     @Test
-    public void testHandleInstallSnapshotReplyLastChunk() {
-        new LeaderTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
+        new JavaTestKit(getSystem()) {{
 
-                    Map<String, String> peerAddresses = new HashMap();
-                    peerAddresses.put(followerActor.path().toString(),
+            TestActorRef<MessageCollectorActor> followerActor =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                    followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                    (MockRaftActorContext) createActorContext();
+
+            actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            });
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                followerActor.path().toString(), -1, false));
+
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
+
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            followerActor.tell(PoisonPill.getInstance(), getRef());
+        }};
+    }
+
+    @Test
+    public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+
+                TestActorRef<MessageCollectorActor> followerActor =
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put(followerActor.path().toString(),
                         followerActor.path().toString());
 
-                    MockRaftActorContext actorContext =
+                final int followersLastIndex = 2;
+                final int snapshotIndex = 3;
+                final int snapshotTerm = 1;
+                final int currentTerm = 2;
+
+                MockRaftActorContext actorContext =
                         (MockRaftActorContext) createActorContext();
-                    actorContext.setPeerAddresses(peerAddresses);
 
-                    final int followersLastIndex = 2;
-                    final int snapshotIndex = 3;
-                    final int newEntryIndex = 4;
-                    final int snapshotTerm = 1;
-                    final int currentTerm = 2;
-
-                    MockLeader leader = new MockLeader(actorContext);
-                    // set the follower info in leader
-                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
-
-                    Map<String, String> leadersSnapshot = new HashMap<>();
-                    leadersSnapshot.put("1", "A");
-                    leadersSnapshot.put("2", "B");
-                    leadersSnapshot.put("3", "C");
-
-                    // set the snapshot variables in replicatedlog
-                    actorContext.getReplicatedLog().setSnapshot(
-                        toByteString(leadersSnapshot));
-                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-
-                    ByteString bs = toByteString(leadersSnapshot);
-                    leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
-                    while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
-                        leader.getFollowerToSnapshot().getNextChunk();
-                        leader.getFollowerToSnapshot().incrementChunkIndex();
+                actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+                    @Override
+                    public int getSnapshotChunkSize() {
+                        return 50;
                     }
+                });
+                actorContext.setPeerAddresses(peerAddresses);
+                actorContext.setCommitIndex(followersLastIndex);
 
-                    //clears leaders log
-                    actorContext.getReplicatedLog().removeFrom(0);
+                MockLeader leader = new MockLeader(actorContext);
 
-                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
-                        new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
-                            leader.getFollowerToSnapshot().getChunkIndex(), true));
+                Map<String, String> leadersSnapshot = new HashMap<>();
+                leadersSnapshot.put("1", "A");
+                leadersSnapshot.put("2", "B");
+                leadersSnapshot.put("3", "C");
 
-                    assertTrue(raftBehavior instanceof Leader);
+                // set the snapshot variables in replicatedlog
+                actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+                actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+                actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
-                    assertEquals(leader.mapFollowerToSnapshot.size(), 0);
-                    assertEquals(leader.followerToLog.size(), 1);
-                    assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
-                    FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
-                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
-                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
-                    assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
-                }
-            };
-        }};
+                ByteString bs = toByteString(leadersSnapshot);
+                leader.setSnapshot(Optional.of(bs));
+
+                leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+                Object o = MessageCollectorActor.getAllMessages(followerActor).get(0);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(1, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+
+                int hashCode = installSnapshot.getData().hashCode();
+
+                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
+                o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+
+                assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+                installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+                assertEquals(2, installSnapshot.getChunkIndex());
+                assertEquals(3, installSnapshot.getTotalChunks());
+                assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+
+                followerActor.tell(PoisonPill.getInstance(), getRef());
+            }};
     }
 
     @Test
@@ -493,6 +900,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return createActorContext(leaderActor);
     }
 
+    @Override
     protected RaftActorContext createActorContext(ActorRef actorRef) {
         return new MockRaftActorContext("test", getSystem(), actorRef);
     }
@@ -557,7 +965,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             ForwardMessageToBehaviorActor.setBehavior(follower);
 
-            Map<String, String> peerAddresses = new HashMap();
+            Map<String, String> peerAddresses = new HashMap<>();
             peerAddresses.put(followerActor.path().toString(),
                 followerActor.path().toString());
 
@@ -580,17 +988,20 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             followerActorContext.setCommitIndex(1);
 
             Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive(followerActor.path().toString());
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntriesMessages.AppendEntries appendEntries =
-                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+            AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntries.class);
 
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
             AppendEntriesReply appendEntriesReply =
@@ -626,7 +1037,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             ForwardMessageToBehaviorActor.setBehavior(follower);
 
-            Map<String, String> peerAddresses = new HashMap();
+            Map<String, String> peerAddresses = new HashMap<>();
             peerAddresses.put(followerActor.path().toString(),
                 followerActor.path().toString());
 
@@ -648,17 +1059,19 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             followerActorContext.setCommitIndex(2);
 
             Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive(followerActor.path().toString());
+
+            Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            AppendEntriesMessages.AppendEntries appendEntries =
-                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
-                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+            AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntries.class);
 
             assertNotNull(appendEntries);
 
             assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
             assertEquals(0, appendEntries.getPrevLogIndex());
 
             AppendEntriesReply appendEntriesReply =
@@ -673,28 +1086,288 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
-    private static class LeaderTestKit extends JavaTestKit {
+    @Test
+    public void testHandleAppendEntriesReplyFailure(){
+        new JavaTestKit(getSystem()) {
+            {
 
-        private LeaderTestKit(ActorSystem actorSystem) {
-            super(actorSystem);
-        }
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
 
-        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
-            // Wait for a specific log message to show up
-            final boolean result =
-            new JavaTestKit.EventFilter<Boolean>(logLevel
-            ) {
-                @Override
-                protected Boolean run() {
-                    return true;
-                }
-            }.from(subject.path().toString())
-                .message(logMessage)
-                .occurrences(1).exec();
+                ActorRef followerActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
 
-            Assert.assertEquals(true, result);
 
-        }
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("follower-1",
+                    followerActor.path().toString());
+
+                leaderActorContext.setPeerAddresses(peerAddresses);
+
+                Leader leader = new Leader(leaderActorContext);
+
+                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+            }};
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplySuccess() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                ActorRef followerActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                leaderActorContext.setReplicatedLog(
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("follower-1",
+                    followerActor.path().toString());
+
+                leaderActorContext.setPeerAddresses(peerAddresses);
+                leaderActorContext.setCommitIndex(1);
+                leaderActorContext.setLastApplied(1);
+                leaderActorContext.getTermInformation().update(1, "leader");
+
+                Leader leader = new Leader(leaderActorContext);
+
+                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+
+                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+                assertEquals(2, leaderActorContext.getCommitIndex());
+
+                ApplyLogEntries applyLogEntries =
+                    (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
+                        ApplyLogEntries.class);
+
+                assertNotNull(applyLogEntries);
+
+                assertEquals(2, leaderActorContext.getLastApplied());
+
+                assertEquals(2, applyLogEntries.getToIndex());
+
+                List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+                    ApplyState.class);
+
+                assertEquals(1,applyStateList.size());
+
+                ApplyState applyState = (ApplyState) applyStateList.get(0);
+
+                assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+            }};
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplyUnknownFollower(){
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                Leader leader = new Leader(leaderActorContext);
+
+                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+            }};
+    }
+
+    @Test
+    public void testHandleRequestVoteReply(){
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                Leader leader = new Leader(leaderActorContext);
+
+                RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+                raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+            }};
+    }
+
+    @Test
+    public void testIsolatedLeaderCheckNoFollowers() {
+        new JavaTestKit(getSystem()) {{
+            ActorRef leaderActor = getTestActor();
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            Leader leader = new Leader(leaderActorContext);
+            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue(behavior instanceof Leader);
+        }};
+    }
+
+    @Test
+    public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor1 = getTestActor();
+            ActorRef followerActor2 = getTestActor();
+
+            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-1", followerActor1.path().toString());
+            peerAddresses.put("follower-2", followerActor2.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.stopIsolatedLeaderCheckSchedule();
+
+            leader.markFollowerActive("follower-1");
+            leader.markFollowerActive("follower-2");
+            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+                behavior instanceof Leader);
+
+            // kill 1 follower and verify if that got killed
+            final JavaTestKit probe = new JavaTestKit(getSystem());
+            probe.watch(followerActor1);
+            followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+            assertEquals(termMsg1.getActor(), followerActor1);
+
+            leader.markFollowerInActive("follower-1");
+            leader.markFollowerActive("follower-2");
+            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+                behavior instanceof Leader);
+
+            // kill 2nd follower and leader should change to Isolated leader
+            followerActor2.tell(PoisonPill.getInstance(), null);
+            probe.watch(followerActor2);
+            followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+            assertEquals(termMsg2.getActor(), followerActor2);
+
+            leader.markFollowerInActive("follower-2");
+            behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+            Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+                behavior instanceof IsolatedLeader);
+
+        }};
+    }
+
+
+    @Test
+    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+            followerActorContext.setConfigParams(configParams);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            //create 3 entries
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive("follower-reply");
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+                .getFirstMatching(followerActor, AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+            List<Object> entries = ForwardMessageToBehaviorActor
+                .getAllMatching(followerActor, AppendEntries.class);
+
+            assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+            AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+            assertEquals(1, appendEntriesSecond.getLeaderCommit());
+            assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+            assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+        }};
     }
 
     class MockLeader extends Leader {
@@ -705,22 +1378,35 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             super(context);
         }
 
-        public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
-            FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(nextIndex),
-                    new AtomicLong(matchIndex));
-            followerToLog.put(followerId, followerLogInformation);
-        }
-
         public FollowerToSnapshot getFollowerToSnapshot() {
             return fts;
         }
 
         public void createFollowerToSnapshot(String followerId, ByteString bs ) {
             fts = new FollowerToSnapshot(bs);
-            mapFollowerToSnapshot.put(followerId, fts);
+            setFollowerSnapshot(followerId, fts);
+        }
+    }
+
+    private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+
+        private final long electionTimeOutIntervalMillis;
+        private final int snapshotChunkSize;
+
+        public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+            super();
+            this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
+            this.snapshotChunkSize = snapshotChunkSize;
+        }
+
+        @Override
+        public FiniteDuration getElectionTimeOutInterval() {
+            return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
+        }
 
+        @Override
+        public int getSnapshotChunkSize() {
+            return snapshotChunkSize;
         }
     }
 }