Merge "Fix raw references to Promise"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index c4ef51d968422533f9df668bb23fd56563dc2ad2..168eb3e5f22c9752dcbe089fe2e87393713d2650 100644 (file)
@@ -1,36 +1,44 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
-import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -51,8 +59,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             // handle message should return the Leader state when it receives an
             // unknown message
-            RaftState state = leader.handleMessage(senderActor, "foo");
-            Assert.assertEquals(RaftState.Leader, state);
+            RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
+            Assert.assertTrue(behavior instanceof Leader);
         }};
     }
 
@@ -66,10 +74,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                     ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
+                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
 
-                    Map<String, String> peerAddresses = new HashMap();
+                    Map<String, String> peerAddresses = new HashMap<>();
 
                     peerAddresses.put(followerActor.path().toString(),
                         followerActor.path().toString());
@@ -114,7 +121,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     MockRaftActorContext actorContext =
                         (MockRaftActorContext) createActorContext();
 
-                    Map<String, String> peerAddresses = new HashMap();
+                    Map<String, String> peerAddresses = new HashMap<>();
 
                     peerAddresses.put(followerActor.path().toString(),
                         followerActor.path().toString());
@@ -122,7 +129,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
-                    RaftState raftState = leader
+                    RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, null,
                             new MockRaftActorContext.MockReplicatedLogEntry(1,
                                 100,
@@ -130,7 +137,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         ));
 
                     // State should not change
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     final String out =
                         new ExpectMsg<String>(duration("1 seconds"), "match hint") {
@@ -149,10 +156,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                         }.get(); // this extracts the received message
 
                     assertEquals("match", out);
-
                 }
-
-
             };
         }};
     }
@@ -171,21 +175,16 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                     actorContext.getReplicatedLog().removeFrom(0);
 
-                    actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(0, 1,
-                        new MockRaftActorContext.MockPayload("foo")));
-
-                    ReplicatedLogImplEntry entry =
-                        new ReplicatedLogImplEntry(1, 1,
-                            new MockRaftActorContext.MockPayload("foo"));
-
-                    actorContext.getReplicatedLog().append(entry);
+                    actorContext.setReplicatedLog(
+                        new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
+                            .build());
 
                     Leader leader = new Leader(actorContext);
-                    RaftState raftState = leader
-                        .handleMessage(senderActor, new Replicate(null, "state-id",entry));
+                    RaftActorBehavior raftBehavior = leader
+                        .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
 
                     // State should not change
-                    assertEquals(RaftState.Leader, raftState);
+                    assertTrue(raftBehavior instanceof Leader);
 
                     assertEquals(1, actorContext.getCommitIndex());
 
@@ -213,230 +212,360 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testSendInstallSnapshot() {
-        new LeaderTestKit(getSystem()) {{
+    public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
 
-            new Within(duration("1 seconds")) {
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
 
-                    Map<String, String> peerAddresses = new HashMap();
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext(leaderActor);
+            actorContext.setPeerAddresses(peerAddresses);
 
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext(getRef());
-                    actorContext.setPeerAddresses(peerAddresses);
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
 
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
 
-                    Map<String, String> leadersSnapshot = new HashMap<>();
-                    leadersSnapshot.put("1", "A");
-                    leadersSnapshot.put("2", "B");
-                    leadersSnapshot.put("3", "C");
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.setCommitIndex(followersLastIndex);
+            //set follower timeout to 2 mins, helps during debugging
+            actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
 
-                    //clears leaders log
-                    actorContext.getReplicatedLog().removeFrom(0);
+            MockLeader leader = new MockLeader(actorContext);
 
-                    final int followersLastIndex = 2;
-                    final int snapshotIndex = 3;
-                    final int newEntryIndex = 4;
-                    final int snapshotTerm = 1;
-                    final int currentTerm = 2;
-
-                    // set the snapshot variables in replicatedlog
-                    actorContext.getReplicatedLog().setSnapshot(
-                        toByteString(leadersSnapshot));
-                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-
-                    MockLeader leader = new MockLeader(actorContext);
-                    // set the follower info in leader
-                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
-
-                    // new entry
-                    ReplicatedLogImplEntry entry =
-                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                            new MockRaftActorContext.MockPayload("D"));
-
-                    // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
-                    RaftState raftState = leader.handleMessage(
-                        senderActor, new Replicate(null, "state-id", entry));
-
-                    assertEquals(RaftState.Leader, raftState);
-
-                    // we might receive some heartbeat messages, so wait till we SendInstallSnapshot
-                    Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
-                        @Override
-                        protected Boolean match(Object o) throws Exception {
-                            if (o instanceof SendInstallSnapshot) {
-                                return true;
-                            }
-                            return false;
-                        }
-                    }.get();
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
 
-                    boolean sendInstallSnapshotReceived = false;
-                    for (Boolean b: matches) {
-                        sendInstallSnapshotReceived = b | sendInstallSnapshotReceived;
-                    }
+            //update follower timestamp
+            leader.markFollowerActive(followerActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+
+            //send first chunk and no InstallSnapshotReply received yet
+            leader.getFollowerToSnapshot().getNextChunk();
+            leader.getFollowerToSnapshot().incrementChunkIndex();
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
+                followerActor, AppendEntries.SERIALIZABLE_CLASS);
+
+            assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
+                "received", aeproto);
+
+            AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
 
-                    assertTrue(sendInstallSnapshotReceived);
+            assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
 
+            //InstallSnapshotReply received
+            leader.getFollowerToSnapshot().markSendStatus(true);
+
+            leader.handleMessage(senderActor, new SendHeartBeat());
+
+            InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
+                MessageCollectorActor.getFirstMatching(followerActor,
+                    InstallSnapshot.SERIALIZABLE_CLASS);
+
+            assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
+                isproto);
+
+            InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+
+            assertEquals(snapshotIndex, is.getLastIncludedIndex());
+
+        }};
+    }
+
+    @Test
+    public void testSendAppendEntriesSnapshotScenario() {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor = getTestActor();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext(getRef());
+            actorContext.setPeerAddresses(peerAddresses);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            Leader leader = new Leader(actorContext);
+
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
+
+            //update follower timestamp
+            leader.markFollowerActive(followerActor.path().toString());
+
+            // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+            RaftActorBehavior raftBehavior = leader.handleMessage(
+                senderActor, new Replicate(null, "state-id", entry));
+
+            assertTrue(raftBehavior instanceof Leader);
+
+            // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
+            Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
+                @Override
+                protected Boolean match(Object o) throws Exception {
+                    if (o instanceof InitiateInstallSnapshot) {
+                        return true;
+                    }
+                    return false;
                 }
-            };
+            }.get();
+
+            boolean initiateInitiateInstallSnapshot = false;
+            for (Boolean b: matches) {
+                initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
+            }
+
+            assertTrue(initiateInitiateInstallSnapshot);
         }};
     }
 
     @Test
-    public void testInstallSnapshot() {
-        new LeaderTestKit(getSystem()) {{
+    public void testInitiateInstallSnapshot() throws Exception {
+        new JavaTestKit(getSystem()) {{
 
-            new Within(duration("1 seconds")) {
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
 
-                    Map<String, String> peerAddresses = new HashMap();
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+            ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
-                    actorContext.setPeerAddresses(peerAddresses);
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
 
 
-                    Map<String, String> leadersSnapshot = new HashMap<>();
-                    leadersSnapshot.put("1", "A");
-                    leadersSnapshot.put("2", "B");
-                    leadersSnapshot.put("3", "C");
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext(leaderActor);
+            actorContext.setPeerAddresses(peerAddresses);
 
-                    //clears leaders log
-                    actorContext.getReplicatedLog().removeFrom(0);
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
 
-                    final int followersLastIndex = 2;
-                    final int snapshotIndex = 3;
-                    final int newEntryIndex = 4;
-                    final int snapshotTerm = 1;
-                    final int currentTerm = 2;
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
 
-                    // set the snapshot variables in replicatedlog
-                    actorContext.getReplicatedLog().setSnapshot(toByteString(leadersSnapshot));
-                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
 
-                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.setLastApplied(3);
+            actorContext.setCommitIndex(followersLastIndex);
 
-                    MockLeader leader = new MockLeader(actorContext);
-                    // set the follower info in leader
-                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
+            Leader leader = new Leader(actorContext);
+            // set the snapshot as absent and check if capture-snapshot is invoked.
+            leader.setSnapshot(Optional.<ByteString>absent());
 
-                    // new entry
-                    ReplicatedLogImplEntry entry =
-                        new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                            new MockRaftActorContext.MockPayload("D"));
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
 
+            actorContext.getReplicatedLog().append(entry);
 
-                    RaftState raftState = leader.handleMessage(senderActor, new SendInstallSnapshot());
+            // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+            RaftActorBehavior raftBehavior = leader.handleMessage(
+                leaderActor, new InitiateInstallSnapshot());
 
-                    assertEquals(RaftState.Leader, raftState);
+            CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
+                getFirstMatching(leaderActor, CaptureSnapshot.class);
 
-                    // check if installsnapshot gets called with the correct values.
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            protected String match(Object in) {
-                                if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
-                                    InstallSnapshot is = (InstallSnapshot)
-                                        SerializationUtils.fromSerializable(in);
-                                    if (is.getData() == null) {
-                                        return "InstallSnapshot data is null";
-                                    }
-                                    if (is.getLastIncludedIndex() != snapshotIndex) {
-                                        return is.getLastIncludedIndex() + "!=" + snapshotIndex;
-                                    }
-                                    if (is.getLastIncludedTerm() != snapshotTerm) {
-                                        return is.getLastIncludedTerm() + "!=" + snapshotTerm;
-                                    }
-                                    if (is.getTerm() == currentTerm) {
-                                        return is.getTerm() + "!=" + currentTerm;
-                                    }
+            assertNotNull(cs);
 
-                                    return "match";
+            assertTrue(cs.isInstallSnapshotInitiated());
+            assertEquals(3, cs.getLastAppliedIndex());
+            assertEquals(1, cs.getLastAppliedTerm());
+            assertEquals(4, cs.getLastIndex());
+            assertEquals(2, cs.getLastTerm());
+        }};
+    }
 
-                               } else {
-                                    return "message mismatch:" + in.getClass();
-                                }
+    @Test
+    public void testInstallSnapshot() {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef followerActor = getTestActor();
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            actorContext.setPeerAddresses(peerAddresses);
+
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+            actorContext.setCommitIndex(followersLastIndex);
+
+            Leader leader = new Leader(actorContext);
+
+            // new entry
+            ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                    new MockRaftActorContext.MockPayload("D"));
+
+            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+                new SendInstallSnapshot(toByteString(leadersSnapshot)));
+
+            assertTrue(raftBehavior instanceof Leader);
+
+            // check if installsnapshot gets called with the correct values.
+            final String out =
+                new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    // do not put code outside this method, will run afterwards
+                    protected String match(Object in) {
+                        if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
+                            InstallSnapshot is = (InstallSnapshot)
+                                SerializationUtils.fromSerializable(in);
+                            if (is.getData() == null) {
+                                return "InstallSnapshot data is null";
+                            }
+                            if (is.getLastIncludedIndex() != snapshotIndex) {
+                                return is.getLastIncludedIndex() + "!=" + snapshotIndex;
+                            }
+                            if (is.getLastIncludedTerm() != snapshotTerm) {
+                                return is.getLastIncludedTerm() + "!=" + snapshotTerm;
+                            }
+                            if (is.getTerm() == currentTerm) {
+                                return is.getTerm() + "!=" + currentTerm;
                             }
-                        }.get(); // this extracts the received message
 
-                    assertEquals("match", out);
-                }
-            };
+                            return "match";
+
+                        } else {
+                            return "message mismatch:" + in.getClass();
+                        }
+                    }
+                }.get(); // this extracts the received message
+
+            assertEquals("match", out);
         }};
     }
 
     @Test
     public void testHandleInstallSnapshotReplyLastChunk() {
-        new LeaderTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+        new JavaTestKit(getSystem()) {{
 
-                    Map<String, String> peerAddresses = new HashMap();
-                    peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+            ActorRef followerActor = getTestActor();
 
-                    MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
-                    actorContext.setPeerAddresses(peerAddresses);
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
 
-                    final int followersLastIndex = 2;
-                    final int snapshotIndex = 3;
-                    final int newEntryIndex = 4;
-                    final int snapshotTerm = 1;
-                    final int currentTerm = 2;
-
-                    MockLeader leader = new MockLeader(actorContext);
-                    // set the follower info in leader
-                    leader.addToFollowerToLog(followerActor.path().toString(), followersLastIndex, -1);
-
-                    Map<String, String> leadersSnapshot = new HashMap<>();
-                    leadersSnapshot.put("1", "A");
-                    leadersSnapshot.put("2", "B");
-                    leadersSnapshot.put("3", "C");
-
-                    // set the snapshot variables in replicatedlog
-                    actorContext.getReplicatedLog().setSnapshot(
-                        toByteString(leadersSnapshot));
-                    actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                    actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-                    actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-
-                    ByteString bs = toByteString(leadersSnapshot);
-                    leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
-                    while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
-                        leader.getFollowerToSnapshot().getNextChunk();
-                        leader.getFollowerToSnapshot().incrementChunkIndex();
-                    }
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int newEntryIndex = 4;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
 
-                    //clears leaders log
-                    actorContext.getReplicatedLog().removeFrom(0);
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
 
-                    RaftState raftState = leader.handleMessage(senderActor,
-                        new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
-                            leader.getFollowerToSnapshot().getChunkIndex(), true));
+            MockLeader leader = new MockLeader(actorContext);
 
-                    assertEquals(RaftState.Leader, raftState);
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
 
-                    assertEquals(leader.mapFollowerToSnapshot.size(), 0);
-                    assertEquals(leader.followerToLog.size(), 1);
-                    assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
-                    FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
-                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
-                    assertEquals(snapshotIndex, fli.getMatchIndex().get());
-                    assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
-                }
-            };
+            // set the snapshot variables in replicatedlog
+
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+            while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
+                leader.getFollowerToSnapshot().getNextChunk();
+                leader.getFollowerToSnapshot().incrementChunkIndex();
+            }
+
+            //clears leaders log
+            actorContext.getReplicatedLog().removeFrom(0);
+
+            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
+                new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
+                    leader.getFollowerToSnapshot().getChunkIndex(), true));
+
+            assertTrue(raftBehavior instanceof Leader);
+
+            assertEquals(leader.mapFollowerToSnapshot.size(), 0);
+            assertEquals(leader.followerToLog.size(), 1);
+            assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
+            FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
+            assertEquals(snapshotIndex, fli.getMatchIndex().get());
+            assertEquals(snapshotIndex, fli.getMatchIndex().get());
+            assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
         }};
     }
 
@@ -526,30 +655,298 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         return null;
     }
 
-    private static class LeaderTestKit extends JavaTestKit {
+    public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
+        private static AbstractRaftActorBehavior behavior;
 
-        private LeaderTestKit(ActorSystem actorSystem) {
-            super(actorSystem);
-        }
+        public ForwardMessageToBehaviorActor(){
 
-        protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
-            // Wait for a specific log message to show up
-            final boolean result =
-            new JavaTestKit.EventFilter<Boolean>(logLevel
-            ) {
-                @Override
-                protected Boolean run() {
-                    return true;
-                }
-            }.from(subject.path().toString())
-                .message(logMessage)
-                .occurrences(1).exec();
+        }
 
-            Assert.assertEquals(true, result);
+        @Override public void onReceive(Object message) throws Exception {
+            super.onReceive(message);
+            behavior.handleMessage(sender(), message);
+        }
 
+        public static void setBehavior(AbstractRaftActorBehavior behavior){
+            ForwardMessageToBehaviorActor.behavior = behavior;
         }
     }
 
+    @Test
+    public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower", getSystem(), followerActor);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            //create 3 entries
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            followerActorContext.getReplicatedLog().removeFrom(0);
+
+            // follower too has the exact same log entries and has the same commit index
+            followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            followerActorContext.setCommitIndex(1);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive(followerActor.path().toString());
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntriesMessages.AppendEntries appendEntries =
+                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+                    leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            // follower returns its next index
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+        }};
+    }
+
+
+    @Test
+    public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            ActorRef followerActor = getSystem().actorOf(
+                Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower", getSystem(), followerActor);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put(followerActor.path().toString(),
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            followerActorContext.getReplicatedLog().removeFrom(0);
+
+            followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            // follower has the same log entries but its commit index > leaders commit index
+            followerActorContext.setCommitIndex(2);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive(followerActor.path().toString());
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntriesMessages.AppendEntries appendEntries =
+                (AppendEntriesMessages.AppendEntries) MessageCollectorActor
+                    .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getLogEntries(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
+                    leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            assertEquals(2, appendEntriesReply.getLogLastIndex());
+            assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+        }};
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplyFailure(){
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                ActorRef followerActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("follower-1",
+                    followerActor.path().toString());
+
+                leaderActorContext.setPeerAddresses(peerAddresses);
+
+                Leader leader = new Leader(leaderActorContext);
+
+                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+            }};
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplySuccess() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                ActorRef followerActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                leaderActorContext.setReplicatedLog(
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+                Map<String, String> peerAddresses = new HashMap<>();
+                peerAddresses.put("follower-1",
+                    followerActor.path().toString());
+
+                leaderActorContext.setPeerAddresses(peerAddresses);
+                leaderActorContext.setCommitIndex(1);
+                leaderActorContext.setLastApplied(1);
+                leaderActorContext.getTermInformation().update(1, "leader");
+
+                Leader leader = new Leader(leaderActorContext);
+
+                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+
+                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+                assertEquals(2, leaderActorContext.getCommitIndex());
+
+                ApplyLogEntries applyLogEntries =
+                    (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
+                        ApplyLogEntries.class);
+
+                assertNotNull(applyLogEntries);
+
+                assertEquals(2, leaderActorContext.getLastApplied());
+
+                assertEquals(2, applyLogEntries.getToIndex());
+
+                List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+                    ApplyState.class);
+
+                assertEquals(1,applyStateList.size());
+
+                ApplyState applyState = (ApplyState) applyStateList.get(0);
+
+                assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+            }};
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplyUnknownFollower(){
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                Leader leader = new Leader(leaderActorContext);
+
+                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+            }};
+    }
+
+    @Test
+    public void testHandleRequestVoteReply(){
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                Leader leader = new Leader(leaderActorContext);
+
+                RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+                raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+
+            }};
+
+    }
+
     class MockLeader extends Leader {
 
         FollowerToSnapshot fts;
@@ -558,14 +955,6 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             super(context);
         }
 
-        public void addToFollowerToLog(String followerId, long nextIndex, long matchIndex) {
-            FollowerLogInformation followerLogInformation =
-                new FollowerLogInformationImpl(followerId,
-                    new AtomicLong(nextIndex),
-                    new AtomicLong(matchIndex));
-            followerToLog.put(followerId, followerLogInformation);
-        }
-
         public FollowerToSnapshot getFollowerToSnapshot() {
             return fts;
         }
@@ -576,4 +965,26 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
         }
     }
+
+    private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
+
+        private long electionTimeOutIntervalMillis;
+        private int snapshotChunkSize;
+
+        public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+            super();
+            this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
+            this.snapshotChunkSize = snapshotChunkSize;
+        }
+
+        @Override
+        public FiniteDuration getElectionTimeOutInterval() {
+            return new FiniteDuration(electionTimeOutIntervalMillis, TimeUnit.MILLISECONDS);
+        }
+
+        @Override
+        public int getSnapshotChunkSize() {
+            return snapshotChunkSize;
+        }
+    }
 }