Remove MockReplicatedLogEntry
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
index 6b0857351df132fd30b10406dae2acfe3cbd1237..9d64b140fb7d9db2c191eb12ffc847deb5273561 100644 (file)
@@ -1,44 +1,86 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
+
+public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
+
+    private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
+            Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
 
-public class FollowerTest extends AbstractRaftActorBehaviorTest {
+    private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
+            Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
 
-    private final ActorRef followerActor = getSystem().actorOf(Props.create(
-        DoNothingActor.class));
+    private Follower follower;
 
+    private final short payloadVersion = 5;
 
-    @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
-        return new Follower(actorContext);
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (follower != null) {
+            follower.close();
+        }
+
+        super.tearDown();
+    }
+
+    @Override
+    protected Follower createBehavior(RaftActorContext actorContext) {
+        return spy(new Follower(actorContext));
     }
 
     @Override
@@ -47,684 +89,1070 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected  MockRaftActorContext createActorContext(ActorRef actorRef){
-        return new MockRaftActorContext("test", getSystem(), actorRef);
+    protected  MockRaftActorContext createActorContext(ActorRef actorRef) {
+        MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
+        context.setPayloadVersion(payloadVersion );
+        return context;
+    }
+
+    @Test
+    public void testThatAnElectionTimeoutIsTriggered() {
+        MockRaftActorContext actorContext = createActorContext();
+        follower = new Follower(actorContext);
+
+        MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
+                actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
+    }
+
+    @Test
+    public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
+        logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
+
+        MockRaftActorContext context = createActorContext();
+        follower = new Follower(context);
+
+        Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+
+        assertTrue(raftBehavior instanceof Candidate);
     }
 
     @Test
-    public void testThatAnElectionTimeoutIsTriggered(){
-        new JavaTestKit(getSystem()) {{
+    public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
+        logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
+
+        MockRaftActorContext context = createActorContext();
+        ((DefaultConfigParamsImpl) context.getConfigParams())
+                .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
+
+        follower = new Follower(context);
+        context.setCurrentBehavior(follower);
+
+        Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
+                .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+                -1, -1, (short) 1));
+
+        Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
+        RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+        assertTrue(raftBehavior instanceof Follower);
+
+        Uninterruptibles.sleepUninterruptibly(context.getConfigParams()
+                .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+                -1, -1, (short) 1));
+
+        Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+        assertTrue(raftBehavior instanceof Follower);
+    }
 
-            new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
-                @Override
-                protected void run() {
+    @Test
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
+        logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
 
-                    Follower follower = new Follower(createActorContext(getTestActor()));
+        MockRaftActorContext context = createActorContext();
+        long term = 1000;
+        context.getTermInformation().update(term, null);
 
-                    final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected Boolean match(Object in) {
-                            if (in instanceof ElectionTimeout) {
-                                return true;
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();
+        follower = createBehavior(context);
 
-                    assertEquals(true, out);
-                }
-            };
-        }};
+        follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999));
+
+        RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
+
+        assertEquals("isVoteGranted", true, reply.isVoteGranted());
+        assertEquals("getTerm", term, reply.getTerm());
+        verify(follower).scheduleElection(any(FiniteDuration.class));
     }
 
     @Test
-    public void testHandleElectionTimeout(){
-        RaftActorContext raftActorContext = createActorContext();
-        Follower follower =
-            new Follower(raftActorContext);
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() {
+        logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
 
-        RaftActorBehavior raftBehavior =
-            follower.handleMessage(followerActor, new ElectionTimeout());
+        MockRaftActorContext context = createActorContext();
+        long term = 1000;
+        context.getTermInformation().update(term, "test");
 
-        assertTrue(raftBehavior instanceof Candidate);
+        follower = createBehavior(context);
+
+        follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999));
+
+        RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
+
+        assertEquals("isVoteGranted", false, reply.isVoteGranted());
+        verify(follower, never()).scheduleElection(any(FiniteDuration.class));
+    }
+
+
+    @Test
+    public void testHandleFirstAppendEntries() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
+        context.getReplicatedLog().setSnapshotIndex(99);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        Assert.assertEquals(1, context.getReplicatedLog().size());
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
     }
 
     @Test
-    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
-        new JavaTestKit(getSystem()) {{
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertFalse("append entries reply should be false", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
+            throws Exception {
+        logStart("testHandleFirstAppendEntries");
 
-                    RaftActorContext context = createActorContext(getTestActor());
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
+        context.getReplicatedLog().setSnapshotIndex(99);
 
-                    context.getTermInformation().update(1000, null);
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
 
-                    RaftActorBehavior follower = createBehavior(context);
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
 
-                    follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999));
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
 
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected Boolean match(Object in) {
-                            if (in instanceof RequestVoteReply) {
-                                RequestVoteReply reply = (RequestVoteReply) in;
-                                return reply.isVoteGranted();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
-                    assertEquals(true, out);
-                }
-            };
-        }};
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
     }
 
     @Test
-    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
-        new JavaTestKit(getSystem()) {{
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
+            throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().setSnapshotIndex(100);
 
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
 
-                    RaftActorContext context = createActorContext(getTestActor());
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
 
-                    context.getTermInformation().update(1000, "test");
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
 
-                    RaftActorBehavior follower = createBehavior(context);
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
+            throws Exception {
+        logStart(
+               "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
 
-                    follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999));
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().setSnapshotIndex(100);
 
-                    final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
-                        // do not put code outside this method, will run afterwards
-                        @Override
-                        protected Boolean match(Object in) {
-                            if (in instanceof RequestVoteReply) {
-                                RequestVoteReply reply = (RequestVoteReply) in;
-                                return reply.isVoteGranted();
-                            } else {
-                                throw noMatch();
-                            }
-                        }
-                    }.get();
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 105, "foo"));
 
-                    assertEquals(false, out);
-                }
-            };
-        }};
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertFalse("append entries reply should be false", reply.isSuccess());
     }
 
+    @Test
+    public void testHandleSyncUpAppendEntries() throws Exception {
+        logStart("testHandleSyncUpAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(101);
+        context.setCommitIndex(101);
+        setLastLogEntry(context, 1, 101,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertTrue(syncStatus.isInitialSyncDone());
+
+        followerActor.underlyingActor().clear();
+
+        // Sending the same message again should not generate another message
+
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertNull(syncStatus);
+
+    }
+
+    @Test
+    public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
+        logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(100);
+        setLastLogEntry(context, 1, 100,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // leader-2 is becoming the leader now and it says the commitIndex is 45
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        // We get a new message saying initial status is not done
+        assertFalse(syncStatus.isInitialSyncDone());
+
+    }
+
+
+    @Test
+    public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
+        logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor,
+                FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(101);
+        context.setCommitIndex(101);
+        setLastLogEntry(context, 1, 101,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertTrue(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(100);
+        setLastLogEntry(context, 1, 100,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // leader-2 is becoming the leader now and it says the commitIndex is 45
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        // We get a new message saying initial status is not done
+        assertFalse(syncStatus.isInitialSyncDone());
+
+    }
+
+
     /**
      * This test verifies that when an AppendEntries RPC is received by a RaftActor
      * with a commitIndex that is greater than what has been applied to the
      * state machine of the RaftActor, the RaftActor applies the state and
      * sets it current applied state to the commitIndex of the sender.
-     *
-     * @throws Exception
      */
     @Test
     public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
-        new JavaTestKit(getSystem()) {{
+        logStart("testHandleAppendEntriesWithNewerCommitIndex");
 
-            RaftActorContext context =
-                createActorContext();
+        MockRaftActorContext context = createActorContext();
 
-            context.setLastApplied(100);
-            setLastLogEntry((MockRaftActorContext) context, 1, 100,
+        context.setLastApplied(100);
+        setLastLogEntry(context, 1, 100,
                 new MockRaftActorContext.MockPayload(""));
-            ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99);
+        context.getReplicatedLog().setSnapshotIndex(99);
 
-            List<ReplicatedLogEntry> entries =
-                Arrays.asList(
-                        (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101,
-                                new MockRaftActorContext.MockPayload("foo"))
-                );
+        List<ReplicatedLogEntry> entries = Arrays.<ReplicatedLogEntry>asList(
+                newReplicatedLogEntry(2, 101, "foo"));
 
-            // The new commitIndex is 101
-            AppendEntries appendEntries =
-                new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
-            RaftActorBehavior raftBehavior =
-                createBehavior(context).handleMessage(getRef(), appendEntries);
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
 
-            assertEquals(101L, context.getLastApplied());
-
-        }};
+        assertEquals("getLastApplied", 101L, context.getLastApplied());
     }
 
     /**
      * This test verifies that when an AppendEntries is received a specific prevLogTerm
      * which does not match the term that is in RaftActors log entry at prevLogIndex
      * then the RaftActor does not change it's state and it returns a failure.
-     *
-     * @throws Exception
      */
     @Test
-    public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm()
-        throws Exception {
-        new JavaTestKit(getSystem()) {{
-
-            MockRaftActorContext context = createActorContext();
+    public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() {
+        logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm");
 
-            // First set the receivers term to lower number
-            context.getTermInformation().update(95, "test");
+        MockRaftActorContext context = createActorContext();
 
-            // Set the last log entry term for the receiver to be greater than
-            // what we will be sending as the prevLogTerm in AppendEntries
-            MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog =
-                setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload(""));
+        // First set the receivers term to lower number
+        context.getTermInformation().update(95, "test");
 
-            // AppendEntries is now sent with a bigger term
-            // this will set the receivers term to be the same as the sender's term
-            AppendEntries appendEntries =
-                new AppendEntries(100, "leader-1", 0, 0, null, 101, -1);
+        // AppendEntries is now sent with a bigger term
+        // this will set the receivers term to be the same as the sender's term
+        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
 
-            RaftActorBehavior behavior = createBehavior(context);
+        follower = createBehavior(context);
 
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
 
-            RaftActorBehavior raftBehavior =
-                behavior.handleMessage(getRef(), appendEntries);
+        Assert.assertSame(follower, newBehavior);
 
-            assertEquals(expected, raftBehavior);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
 
-            // Also expect an AppendEntriesReply to be sent where success is false
-            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
-                "AppendEntriesReply") {
-                // do not put code outside this method, will run afterwards
-                @Override
-                protected Boolean match(Object in) {
-                    if (in instanceof AppendEntriesReply) {
-                        AppendEntriesReply reply = (AppendEntriesReply) in;
-                        return reply.isSuccess();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get();
-
-            assertEquals(false, out);
-
-
-        }};
+        assertEquals("isSuccess", false, reply.isSuccess());
     }
 
-
-
     /**
      * This test verifies that when a new AppendEntries message is received with
      * new entries and the logs of the sender and receiver match that the new
      * entries get added to the log and the log is incremented by the number of
-     * entries received in appendEntries
-     *
-     * @throws Exception
+     * entries received in appendEntries.
      */
     @Test
-    public void testHandleAppendEntriesAddNewEntries() throws Exception {
-        new JavaTestKit(getSystem()) {{
-
-            MockRaftActorContext context = createActorContext();
+    public void testHandleAppendEntriesAddNewEntries() {
+        logStart("testHandleAppendEntriesAddNewEntries");
 
-            // First set the receivers term to lower number
-            context.getTermInformation().update(1, "test");
+        MockRaftActorContext context = createActorContext();
 
-            // Prepare the receivers log
-            MockRaftActorContext.SimpleReplicatedLog log =
-                new MockRaftActorContext.SimpleReplicatedLog();
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
+        // First set the receivers term to lower number
+        context.getTermInformation().update(1, "test");
 
-            context.setReplicatedLog(log);
+        // Prepare the receivers log
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+        log.append(newReplicatedLogEntry(1, 0, "zero"));
+        log.append(newReplicatedLogEntry(1, 1, "one"));
+        log.append(newReplicatedLogEntry(1, 2, "two"));
 
-            // Prepare the entries to be sent with AppendEntries
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three")));
-            entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four")));
+        context.setReplicatedLog(log);
 
-            // Send appendEntries with the same term as was set on the receiver
-            // before the new behavior was created (1 in this case)
-            // This will not work for a Candidate because as soon as a Candidate
-            // is created it increments the term
-            AppendEntries appendEntries =
-                new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
+        // Prepare the entries to be sent with AppendEntries
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(newReplicatedLogEntry(1, 3, "three"));
+        entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-            RaftActorBehavior behavior = createBehavior(context);
+        // Send appendEntries with the same term as was set on the receiver
+        // before the new behavior was created (1 in this case)
+        // This will not work for a Candidate because as soon as a Candidate
+        // is created it increments the term
+        short leaderPayloadVersion = 10;
+        String leaderId = "leader-1";
+        AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
 
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+        follower = createBehavior(context);
 
-            RaftActorBehavior raftBehavior =
-                behavior.handleMessage(getRef(), appendEntries);
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
 
-            assertEquals(expected, raftBehavior);
-            assertEquals(5, log.last().getIndex() + 1);
-            assertNotNull(log.get(3));
-            assertNotNull(log.get(4));
+        Assert.assertSame(follower, newBehavior);
 
-            // Also expect an AppendEntriesReply to be sent where success is false
-            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
-                "AppendEntriesReply") {
-                // do not put code outside this method, will run afterwards
-                @Override
-                protected Boolean match(Object in) {
-                    if (in instanceof AppendEntriesReply) {
-                        AppendEntriesReply reply = (AppendEntriesReply) in;
-                        return reply.isSuccess();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get();
+        assertEquals("Next index", 5, log.last().getIndex() + 1);
+        assertEquals("Entry 3", entries.get(0), log.get(3));
+        assertEquals("Entry 4", entries.get(1), log.get(4));
 
-            assertEquals(true, out);
+        assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
+        assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
 
-
-        }};
+        expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
     }
 
-
-
     /**
      * This test verifies that when a new AppendEntries message is received with
      * new entries and the logs of the sender and receiver are out-of-sync that
      * the log is first corrected by removing the out of sync entries from the
-     * log and then adding in the new entries sent with the AppendEntries message
-     *
-     * @throws Exception
+     * log and then adding in the new entries sent with the AppendEntries message.
      */
     @Test
-    public void testHandleAppendEntriesCorrectReceiverLogEntries()
-        throws Exception {
-        new JavaTestKit(getSystem()) {{
+    public void testHandleAppendEntriesCorrectReceiverLogEntries() {
+        logStart("testHandleAppendEntriesCorrectReceiverLogEntries");
+
+        MockRaftActorContext context = createActorContext();
+
+        // First set the receivers term to lower number
+        context.getTermInformation().update(1, "test");
 
-            MockRaftActorContext context = createActorContext();
+        // Prepare the receivers log
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+        log.append(newReplicatedLogEntry(1, 0, "zero"));
+        log.append(newReplicatedLogEntry(1, 1, "one"));
+        log.append(newReplicatedLogEntry(1, 2, "two"));
 
-            // First set the receivers term to lower number
-            context.getTermInformation().update(2, "test");
+        context.setReplicatedLog(log);
 
-            // Prepare the receivers log
-            MockRaftActorContext.SimpleReplicatedLog log =
-                new MockRaftActorContext.SimpleReplicatedLog();
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
-            log.append(
-                new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
+        // Prepare the entries to be sent with AppendEntries
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(newReplicatedLogEntry(2, 2, "two-1"));
+        entries.add(newReplicatedLogEntry(2, 3, "three"));
 
-            context.setReplicatedLog(log);
+        // Send appendEntries with the same term as was set on the receiver
+        // before the new behavior was created (1 in this case)
+        // This will not work for a Candidate because as soon as a Candidate
+        // is created it increments the term
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
 
-            // Prepare the entries to be sent with AppendEntries
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1")));
-            entries.add(
-                new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three")));
+        follower = createBehavior(context);
 
-            // Send appendEntries with the same term as was set on the receiver
-            // before the new behavior was created (1 in this case)
-            // This will not work for a Candidate because as soon as a Candidate
-            // is created it increments the term
-            AppendEntries appendEntries =
-                new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1);
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
 
-            RaftActorBehavior behavior = createBehavior(context);
+        Assert.assertSame(follower, newBehavior);
 
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+        // The entry at index 2 will be found out-of-sync with the leader
+        // and will be removed
+        // Then the two new entries will be added to the log
+        // Thus making the log to have 4 entries
+        assertEquals("Next index", 4, log.last().getIndex() + 1);
+        //assertEquals("Entry 2", entries.get(0), log.get(2));
 
-            RaftActorBehavior raftBehavior =
-                behavior.handleMessage(getRef(), appendEntries);
+        assertEquals("Entry 1 data", "one", log.get(1).getData().toString());
 
-            assertEquals(expected, raftBehavior);
+        // Check that the entry at index 2 has the new data
+        assertEquals("Entry 2", entries.get(0), log.get(2));
 
-            // The entry at index 2 will be found out-of-sync with the leader
-            // and will be removed
-            // Then the two new entries will be added to the log
-            // Thus making the log to have 4 entries
-            assertEquals(4, log.last().getIndex() + 1);
-            assertNotNull(log.get(2));
+        assertEquals("Entry 3", entries.get(1), log.get(3));
 
-            assertEquals("one", log.get(1).getData().toString());
+        expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
+    }
 
-            // Check that the entry at index 2 has the new data
-            assertEquals("two-1", log.get(2).getData().toString());
+    @Test
+    public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
+        logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
 
-            assertEquals("three", log.get(3).getData().toString());
+        MockRaftActorContext context = createActorContext();
 
-            assertNotNull(log.get(3));
+        // First set the receivers term to lower number
+        context.getTermInformation().update(1, "test");
 
-            // Also expect an AppendEntriesReply to be sent where success is false
-            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
-                "AppendEntriesReply") {
-                // do not put code outside this method, will run afterwards
-                @Override
-                protected Boolean match(Object in) {
-                    if (in instanceof AppendEntriesReply) {
-                        AppendEntriesReply reply = (AppendEntriesReply) in;
-                        return reply.isSuccess();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get();
+        // Prepare the receivers log
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+        log.append(newReplicatedLogEntry(1, 0, "zero"));
+        log.append(newReplicatedLogEntry(1, 1, "one"));
+        log.append(newReplicatedLogEntry(1, 2, "two"));
 
-            assertEquals(true, out);
+        context.setReplicatedLog(log);
 
+        // Prepare the entries to be sent with AppendEntries
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(newReplicatedLogEntry(2, 2, "two-1"));
+        entries.add(newReplicatedLogEntry(2, 3, "three"));
 
-        }};
-    }
+        // Send appendEntries with the same term as was set on the receiver
+        // before the new behavior was created (1 in this case)
+        // This will not work for a Candidate because as soon as a Candidate
+        // is created it increments the term
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
 
-    @Test
-    public void testHandleAppendEntriesPreviousLogEntryMissing(){
-        new JavaTestKit(getSystem()) {{
+        context.setRaftPolicy(createRaftPolicy(false, true));
+        follower = createBehavior(context);
 
-            MockRaftActorContext context = createActorContext();
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
 
-            // Prepare the receivers log
-            MockRaftActorContext.SimpleReplicatedLog log =
-                    new MockRaftActorContext.SimpleReplicatedLog();
-            log.append(
-                    new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
-            log.append(
-                    new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
-            log.append(
-                    new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
+        Assert.assertSame(follower, newBehavior);
 
-            context.setReplicatedLog(log);
+        expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
+    }
 
-            // Prepare the entries to be sent with AppendEntries
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            entries.add(
-                    new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+    @Test
+    public void testHandleAppendEntriesPreviousLogEntryMissing() {
+        logStart("testHandleAppendEntriesPreviousLogEntryMissing");
 
-            AppendEntries appendEntries =
-                    new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1);
+        final MockRaftActorContext context = createActorContext();
 
-            RaftActorBehavior behavior = createBehavior(context);
+        // Prepare the receivers log
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+        log.append(newReplicatedLogEntry(1, 0, "zero"));
+        log.append(newReplicatedLogEntry(1, 1, "one"));
+        log.append(newReplicatedLogEntry(1, 2, "two"));
 
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+        context.setReplicatedLog(log);
 
-            RaftActorBehavior raftBehavior =
-                    behavior.handleMessage(getRef(), appendEntries);
+        // Prepare the entries to be sent with AppendEntries
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-            assertEquals(expected, raftBehavior);
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
 
-            // Also expect an AppendEntriesReply to be sent where success is false
-            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
-                    "AppendEntriesReply") {
-                // do not put code outside this method, will run afterwards
-                @Override
-                protected Boolean match(Object in) {
-                    if (in instanceof AppendEntriesReply) {
-                        AppendEntriesReply reply = (AppendEntriesReply) in;
-                        return reply.isSuccess();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get();
+        follower = createBehavior(context);
 
-            assertEquals(false, out);
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
 
-        }};
+        Assert.assertSame(follower, newBehavior);
 
+        expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2);
     }
 
     @Test
-    public void testHandleAppendAfterInstallingSnapshot(){
-        new JavaTestKit(getSystem()) {{
+    public void testHandleAppendEntriesWithExistingLogEntry() {
+        logStart("testHandleAppendEntriesWithExistingLogEntry");
+
+        MockRaftActorContext context = createActorContext();
+
+        context.getTermInformation().update(1, "test");
 
-            MockRaftActorContext context = createActorContext();
+        // Prepare the receivers log
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+        log.append(newReplicatedLogEntry(1, 0, "zero"));
+        log.append(newReplicatedLogEntry(1, 1, "one"));
 
+        context.setReplicatedLog(log);
 
-            // Prepare the receivers log
-            MockRaftActorContext.SimpleReplicatedLog log =
-                    new MockRaftActorContext.SimpleReplicatedLog();
+        // Send the last entry again.
+        List<ReplicatedLogEntry> entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"));
 
-            // Set up a log as if it has been snapshotted
-            log.setSnapshotIndex(3);
-            log.setSnapshotTerm(1);
+        follower = createBehavior(context);
 
-            context.setReplicatedLog(log);
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
 
-            // Prepare the entries to be sent with AppendEntries
-            List<ReplicatedLogEntry> entries = new ArrayList<>();
-            entries.add(
-                    new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+        assertEquals("Next index", 2, log.last().getIndex() + 1);
+        assertEquals("Entry 1", entries.get(0), log.get(1));
 
-            AppendEntries appendEntries =
-                    new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3);
+        expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1);
 
-            RaftActorBehavior behavior = createBehavior(context);
+        // Send the last entry again and also a new one.
+
+        entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
+
+        leaderActor.underlyingActor().clear();
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
+
+        assertEquals("Next index", 3, log.last().getIndex() + 1);
+        assertEquals("Entry 1", entries.get(0), log.get(1));
+        assertEquals("Entry 2", entries.get(1), log.get(2));
+
+        expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2);
+    }
+
+    @Test
+    public void testHandleAppendEntriesAfterInstallingSnapshot() {
+        logStart("testHandleAppendAfterInstallingSnapshot");
 
-            // Send an unknown message so that the state of the RaftActor remains unchanged
-            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+        MockRaftActorContext context = createActorContext();
 
-            RaftActorBehavior raftBehavior =
-                    behavior.handleMessage(getRef(), appendEntries);
+        // Prepare the receivers log
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
 
-            assertEquals(expected, raftBehavior);
+        // Set up a log as if it has been snapshotted
+        log.setSnapshotIndex(3);
+        log.setSnapshotTerm(1);
 
-            // Also expect an AppendEntriesReply to be sent where success is false
-            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
-                    "AppendEntriesReply") {
-                // do not put code outside this method, will run afterwards
-                @Override
-                protected Boolean match(Object in) {
-                    if (in instanceof AppendEntriesReply) {
-                        AppendEntriesReply reply = (AppendEntriesReply) in;
-                        return reply.isSuccess();
-                    } else {
-                        throw noMatch();
-                    }
-                }
-            }.get();
+        context.setReplicatedLog(log);
 
-            assertEquals(true, out);
+        // Prepare the entries to be sent with AppendEntries
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-        }};
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
 
+        follower = createBehavior(context);
+
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+        Assert.assertSame(follower, newBehavior);
+
+        expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
     }
 
 
     /**
      * This test verifies that when InstallSnapshot is received by
      * the follower its applied correctly.
-     *
-     * @throws Exception
      */
     @Test
     public void testHandleInstallSnapshot() throws Exception {
-        JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{
-
-            ActorRef leaderActor = getSystem().actorOf(Props.create(
-                MessageCollectorActor.class));
-
-            MockRaftActorContext context = createActorContext(getRef());
-
-            Follower follower = (Follower)createBehavior(context);
-
-            HashMap<String, String> followerSnapshot = new HashMap<>();
-            followerSnapshot.put("1", "A");
-            followerSnapshot.put("2", "B");
-            followerSnapshot.put("3", "C");
-
-            ByteString bsSnapshot  = toByteString(followerSnapshot);
-            ByteString chunkData = ByteString.EMPTY;
-            int offset = 0;
-            int snapshotLength = bsSnapshot.size();
-            int i = 1;
-            int chunkIndex = 1;
-
-            do {
-                chunkData = getNextChunk(bsSnapshot, offset);
-                final InstallSnapshot installSnapshot =
-                    new InstallSnapshot(1, "leader-1", i, 1,
-                        chunkData, chunkIndex, 3);
-                follower.handleMessage(leaderActor, installSnapshot);
-                offset = offset + 50;
-                i++;
-                chunkIndex++;
-            } while ((offset+50) < snapshotLength);
-
-            final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3);
-            follower.handleMessage(leaderActor, installSnapshot3);
-
-            String[] matches = new ReceiveWhile<String>(String.class, duration("2 seconds")) {
-                @Override
-                protected String match(Object o) throws Exception {
-                    if (o instanceof ApplySnapshot) {
-                        ApplySnapshot as = (ApplySnapshot)o;
-                        if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) {
-                            return "applySnapshot-lastIndex-mismatch";
-                        }
-                        if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) {
-                            return "applySnapshot-lastAppliedTerm-mismatch";
-                        }
-                        if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) {
-                            return "applySnapshot-lastAppliedIndex-mismatch";
-                        }
-                        if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) {
-                            return "applySnapshot-lastTerm-mismatch";
-                        }
-                        return "applySnapshot";
-                    }
-
-                    return "ignoreCase";
-                }
-            }.get();
-
-            // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty
-            assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
-
-            String applySnapshotMatch = "";
-            for (String reply: matches) {
-                if (reply.startsWith("applySnapshot")) {
-                    applySnapshotMatch = reply;
-                }
-            }
+        logStart("testHandleInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+        context.getTermInformation().update(1, "leader");
+
+        follower = createBehavior(context);
+
+        ByteString bsSnapshot  = createSnapshot();
+        int offset = 0;
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+        int chunkIndex = 1;
+        InstallSnapshot lastInstallSnapshot = null;
+
+        for (int i = 0; i < totalChunks; i++) {
+            byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+            lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                    chunkData, chunkIndex, totalChunks);
+            follower.handleMessage(leaderActor, lastInstallSnapshot);
+            offset = offset + 50;
+            lastIncludedIndex++;
+            chunkIndex++;
+        }
 
-            assertEquals("applySnapshot", applySnapshotMatch);
+        ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
+                ApplySnapshot.class);
+        Snapshot snapshot = applySnapshot.getSnapshot();
+        assertNotNull(lastInstallSnapshot);
+        assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
+        assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
+                snapshot.getLastAppliedTerm());
+        assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
+                snapshot.getLastAppliedIndex());
+        assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
+        Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+        assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
+        applySnapshot.getCallback().onSuccess();
+
+        List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
+                leaderActor, InstallSnapshotReply.class);
+        assertEquals("InstallSnapshotReply count", totalChunks, replies.size());
+
+        chunkIndex = 1;
+        for (InstallSnapshotReply reply: replies) {
+            assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex());
+            assertEquals("getTerm", 1, reply.getTerm());
+            assertEquals("isSuccess", true, reply.isSuccess());
+            assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
+        }
 
-            Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+        assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
+    }
 
-            assertNotNull(messages);
-            assertTrue(messages instanceof List);
-            List<Object> listMessages = (List<Object>) messages;
 
-            int installSnapshotReplyReceivedCount = 0;
-            for (Object message: listMessages) {
-                if (message instanceof InstallSnapshotReply) {
-                    ++installSnapshotReplyReceivedCount;
-                }
-            }
+    /**
+     * Verify that when an AppendEntries is sent to a follower during a snapshot install
+     * the Follower short-circuits the processing of the AppendEntries message.
+     */
+    @Test
+    public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
+        logStart("testReceivingAppendEntriesDuringInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+
+        follower = createBehavior(context);
+
+        ByteString bsSnapshot  = createSnapshot();
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+
+        // Check that snapshot installation is not in progress
+        assertNull(follower.getSnapshotTracker());
+
+        // Make sure that we have more than 1 chunk to send
+        assertTrue(totalChunks > 1);
+
+        // Send an install snapshot with the first chunk to start the process of installing a snapshot
+        byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+        follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                chunkData, 1, totalChunks));
+
+        // Check if snapshot installation is in progress now
+        assertNotNull(follower.getSnapshotTracker());
+
+        // Send an append entry
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
+                Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
+
+        follower.handleMessage(leaderActor, appendEntries);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+        assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+        assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+        assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+        assertNotNull(follower.getSnapshotTracker());
+    }
+
+    @Test
+    public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
+        logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
+
+        MockRaftActorContext context = createActorContext();
+
+        follower = createBehavior(context);
+
+        ByteString bsSnapshot  = createSnapshot();
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+
+        // Check that snapshot installation is not in progress
+        assertNull(follower.getSnapshotTracker());
 
-            assertEquals(3, installSnapshotReplyReceivedCount);
+        // Make sure that we have more than 1 chunk to send
+        assertTrue(totalChunks > 1);
 
-        }};
+        // Send an install snapshot with the first chunk to start the process of installing a snapshot
+        byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+        follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                chunkData, 1, totalChunks));
+
+        // Check if snapshot installation is in progress now
+        assertNotNull(follower.getSnapshotTracker());
+
+        // Send appendEntries with a new term and leader.
+        AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
+                Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
+
+        follower.handleMessage(leaderActor, appendEntries);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+        assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
+        assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
+        assertEquals("getTerm", 2, reply.getTerm());
+
+        assertNull(follower.getSnapshotTracker());
+    }
+
+    @Test
+    public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
+        logStart("testInitialSyncUpWithHandleInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+        context.setCommitIndex(-1);
+
+        follower = createBehavior(context);
+
+        ByteString bsSnapshot  = createSnapshot();
+        int offset = 0;
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+        int chunkIndex = 1;
+        InstallSnapshot lastInstallSnapshot = null;
+
+        for (int i = 0; i < totalChunks; i++) {
+            byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+            lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                    chunkData, chunkIndex, totalChunks);
+            follower.handleMessage(leaderActor, lastInstallSnapshot);
+            offset = offset + 50;
+            lastIncludedIndex++;
+            chunkIndex++;
+        }
+
+        FollowerInitialSyncUpStatus syncStatus =
+                MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(101);
+        context.setCommitIndex(101);
+        setLastLogEntry(context, 1, 101,
+                new MockRaftActorContext.MockPayload(""));
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertTrue(syncStatus.isInitialSyncDone());
     }
 
     @Test
     public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
-        JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {
-            {
+        logStart("testHandleOutOfSequenceInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
 
-                ActorRef leaderActor = getSystem().actorOf(Props.create(
-                        MessageCollectorActor.class));
+        follower = createBehavior(context);
 
-                MockRaftActorContext context = createActorContext(getRef());
+        ByteString bsSnapshot = createSnapshot();
 
-                Follower follower = (Follower) createBehavior(context);
+        InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
+                getNextChunk(bsSnapshot, 10, 50), 3, 3);
+        follower.handleMessage(leaderActor, installSnapshot);
 
-                HashMap<String, String> followerSnapshot = new HashMap<>();
-                followerSnapshot.put("1", "A");
-                followerSnapshot.put("2", "B");
-                followerSnapshot.put("3", "C");
+        InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                InstallSnapshotReply.class);
 
-                ByteString bsSnapshot = toByteString(followerSnapshot);
+        assertEquals("isSuccess", false, reply.isSuccess());
+        assertEquals("getChunkIndex", -1, reply.getChunkIndex());
+        assertEquals("getTerm", 1, reply.getTerm());
+        assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
 
-                final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3);
-                follower.handleMessage(leaderActor, installSnapshot);
+        assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
+    }
+
+    @Test
+    public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() {
+        MockRaftActorContext context = createActorContext();
 
-                Object messages = executeLocalOperation(leaderActor, "get-all-messages");
+        Stopwatch stopwatch = Stopwatch.createStarted();
 
-                assertNotNull(messages);
-                assertTrue(messages instanceof List);
-                List<Object> listMessages = (List<Object>) messages;
+        follower = createBehavior(context);
 
-                int installSnapshotReplyReceivedCount = 0;
-                for (Object message: listMessages) {
-                    if (message instanceof InstallSnapshotReply) {
-                        ++installSnapshotReplyReceivedCount;
-                    }
-                }
+        TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
 
-                assertEquals(1, installSnapshotReplyReceivedCount);
-                InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0);
-                assertEquals(false, reply.isSuccess());
-                assertEquals(-1, reply.getChunkIndex());
-                assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected());
+        long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
 
+        assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
 
-            }};
+        RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+        assertTrue("Expected Candidate", newBehavior instanceof Candidate);
     }
 
-    public Object executeLocalOperation(ActorRef actor, Object message) throws Exception {
-        return MessageCollectorActor.getAllMessages(actor);
+    @Test
+    public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() {
+        MockRaftActorContext context = createActorContext();
+        context.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public FiniteDuration getElectionTimeOutInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        context.setRaftPolicy(createRaftPolicy(false, false));
+
+        follower = createBehavior(context);
+
+        TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+        RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+        assertSame("handleMessage result", follower, newBehavior);
+    }
+
+    @Test
+    public void testFollowerSchedulesElectionIfNonVoting() {
+        MockRaftActorContext context = createActorContext();
+        context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
+                FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
+
+        follower = new Follower(context, "leader", (short)1);
+
+        ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
+                ElectionTimeout.class);
+        RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
+        assertSame("handleMessage result", follower, newBehavior);
+        assertNull("Expected null leaderId", follower.getLeaderId());
     }
 
-    public ByteString getNextChunk (ByteString bs, int offset){
+    @Test
+    public void testElectionScheduledWhenAnyRaftRPCReceived() {
+        MockRaftActorContext context = createActorContext();
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, new RaftRPC() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public long getTerm() {
+                return 100;
+            }
+        });
+        verify(follower).scheduleElection(any(FiniteDuration.class));
+    }
+
+    @Test
+    public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() {
+        MockRaftActorContext context = createActorContext();
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, "non-raft-rpc");
+        verify(follower, never()).scheduleElection(any(FiniteDuration.class));
+    }
+
+    public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
         int snapshotLength = bs.size();
         int start = offset;
-        int size = 50;
-        if (50 > snapshotLength) {
+        int size = chunkSize;
+        if (chunkSize > snapshotLength) {
             size = snapshotLength;
         } else {
-            if ((start + 50) > snapshotLength) {
+            if (start + chunkSize > snapshotLength) {
                 size = snapshotLength - start;
             }
         }
-        return bs.substring(start, start + size);
-    }
-
-    private ByteString toByteString(Map<String, String> state) {
-        ByteArrayOutputStream b = null;
-        ObjectOutputStream o = null;
-        try {
-            try {
-                b = new ByteArrayOutputStream();
-                o = new ObjectOutputStream(b);
-                o.writeObject(state);
-                byte[] snapshotBytes = b.toByteArray();
-                return ByteString.copyFrom(snapshotBytes);
-            } finally {
-                if (o != null) {
-                    o.flush();
-                    o.close();
-                }
-                if (b != null) {
-                    b.close();
-                }
-            }
-        } catch (IOException e) {
-            org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
-        }
-        return null;
+
+        byte[] nextChunk = new byte[size];
+        bs.copyTo(nextChunk, start, 0, size);
+        return nextChunk;
+    }
+
+    private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
+            String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
+        expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
+    }
+
+    private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
+                                                   String expFollowerId, long expLogLastTerm, long expLogLastIndex,
+                                                   boolean expForceInstallSnapshot) {
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
+                AppendEntriesReply.class);
+
+        assertEquals("isSuccess", expSuccess, reply.isSuccess());
+        assertEquals("getTerm", expTerm, reply.getTerm());
+        assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
+        assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
+        assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
+        assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
+        assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
+    }
+
+
+    private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
+        return new SimpleReplicatedLogEntry(index, term,
+                new MockRaftActorContext.MockPayload(data));
+    }
+
+    private ByteString createSnapshot() {
+        HashMap<String, String> followerSnapshot = new HashMap<>();
+        followerSnapshot.put("1", "A");
+        followerSnapshot.put("2", "B");
+        followerSnapshot.put("3", "C");
+
+        return toByteString(followerSnapshot);
+    }
+
+    @Override
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
+            ActorRef actorRef, RaftRPC rpc) throws Exception {
+        super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
+
+        String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
+        assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
+    }
+
+    @Override
+    protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
+            throws Exception {
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
     }
 }