Refactor LeaderTest
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 119b958799e815d21b22c18dae11e9ff7415d928..f087793674ec7f304192ec55871379f1adbfc03b 100644 (file)
@@ -10,6 +10,7 @@ import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.io.ByteArrayOutputStream;
@@ -19,6 +20,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
@@ -28,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -35,810 +38,658 @@ import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderChec
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
-    private final ActorRef leaderActor =
-        getSystem().actorOf(Props.create(DoNothingActor.class));
-    private final ActorRef senderActor =
-        getSystem().actorOf(Props.create(DoNothingActor.class));
+    static final String FOLLOWER_ID = "follower";
 
-    @Test
-    public void testHandleMessageForUnknownMessage() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            Leader leader =
-                new Leader(createActorContext());
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
 
-            // handle message should return the Leader state when it receives an
-            // unknown message
-            RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
-            Assert.assertTrue(behavior instanceof Leader);
-        }};
+    private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
+            Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
+
+    private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
+            Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
+
+    private Leader leader;
+
+    @After
+    public void tearDown() throws Exception {
+        if(leader != null) {
+            leader.close();
+        }
+
+        actorFactory.close();
+    }
+
+    private void logStart(String name) {
+        LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
     }
 
     @Test
-    public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
-        new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+    public void testHandleMessageForUnknownMessage() throws Exception {
+        logStart("testHandleMessageForUnknownMessage");
+
+        leader = new Leader(createActorContext());
 
-                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+        // handle message should return the Leader state when it receives an
+        // unknown message
+        RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
+        Assert.assertTrue(behavior instanceof Leader);
+    }
 
-                    Map<String, String> peerAddresses = new HashMap<>();
+    @Test
+    public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
+        logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
-                    String followerId = "follower";
-                    peerAddresses.put(followerId, followerActor.path().toString());
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-                    actorContext.setPeerAddresses(peerAddresses);
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
 
-                    long term = 1;
-                    actorContext.getTermInformation().update(term, "");
+        leader = new Leader(actorContext);
 
-                    Leader leader = new Leader(actorContext);
+        // Leader should send an immediate heartbeat with no entries as follower is inactive.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getTerm", term, appendEntries.getTerm());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
 
-                    // Leader should send an immediate heartbeat with no entries as follower is inactive.
-                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
-                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
-                    assertEquals("getTerm", term, appendEntries.getTerm());
-                    assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
-                    assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
-                    assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        // The follower would normally reply - simulate that explicitly here.
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex - 1, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
-                    // The follower would normally reply - simulate that explicitly here.
-                    leader.handleMessage(followerActor, new AppendEntriesReply(
-                            followerId, term, true, lastIndex - 1, term));
-                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+        followerActor.underlyingActor().clear();
 
-                    // Sleep for the heartbeat interval so AppendEntries is sent.
-                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
-                            getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+        // Sleep for the heartbeat interval so AppendEntries is sent.
+        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+                getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
 
-                    leader.handleMessage(senderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-                    appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
-                    assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
-                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
-                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
-                    assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
-                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
-                }
-            };
-        }};
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
     }
 
     @Test
-    public void testHandleReplicateMessageSendAppendEntriesToFollower() {
-        new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
-
-                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
-
-                    Map<String, String> peerAddresses = new HashMap<>();
+    public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
+        logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
 
-                    String followerId = "follower";
-                    peerAddresses.put(followerId, followerActor.path().toString());
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-                    actorContext.setPeerAddresses(peerAddresses);
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
 
-                    long term = 1;
-                    actorContext.getTermInformation().update(term, "");
+        leader = new Leader(actorContext);
 
-                    Leader leader = new Leader(actorContext);
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-                    // Leader will send an immediate heartbeat - ignore it.
-                    expectMsgClass(duration("5 seconds"), AppendEntries.class);
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
-                    // The follower would normally reply - simulate that explicitly here.
-                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
-                    leader.handleMessage(followerActor, new AppendEntriesReply(
-                            followerId, term, true, lastIndex, term));
-                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+        followerActor.underlyingActor().clear();
 
-                    MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
-                    MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                            1, lastIndex + 1, payload);
-                    actorContext.getReplicatedLog().append(newEntry);
-                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
-                            new Replicate(null, null, newEntry));
+        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                1, lastIndex + 1, payload);
+        actorContext.getReplicatedLog().append(newEntry);
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new Replicate(null, null, newEntry));
 
-                    // State should not change
-                    assertTrue(raftBehavior instanceof Leader);
+        // State should not change
+        assertTrue(raftBehavior instanceof Leader);
 
-                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
-                    assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
-                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
-                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
-                    assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
-                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
-                    assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
-                }
-            };
-        }};
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
     }
 
     @Test
-    public void testHandleReplicateMessageWhenThereAreNoFollowers() {
-        new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
+    public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
+        logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
 
-                    ActorRef raftActor = getTestActor();
+        MockRaftActorContext actorContext = createActorContext();
 
-                    MockRaftActorContext actorContext =
-                        new MockRaftActorContext("test", getSystem(), raftActor);
+        leader = new Leader(actorContext);
 
-                    actorContext.getReplicatedLog().removeFrom(0);
+        actorContext.setLastApplied(0);
 
-                    actorContext.setReplicatedLog(
-                        new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
-                            .build());
+        long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
+        long term = actorContext.getTermInformation().getCurrentTerm();
+        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
 
-                    Leader leader = new Leader(actorContext);
-                    RaftActorBehavior raftBehavior = leader
-                        .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
+        actorContext.getReplicatedLog().append(newEntry);
 
-                    // State should not change
-                    assertTrue(raftBehavior instanceof Leader);
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new Replicate(leaderActor, "state-id", newEntry));
 
-                    assertEquals(1, actorContext.getCommitIndex());
+        // State should not change
+        assertTrue(raftBehavior instanceof Leader);
 
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"),
-                            "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            @Override
-                            protected String match(Object in) {
-                                if (in instanceof ApplyState) {
-                                    if (((ApplyState) in).getIdentifier().equals("state-id")) {
-                                        return "match";
-                                    }
-                                    return null;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
+        assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
 
-                    assertEquals("match", out);
+        // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
+        // one since lastApplied state is 0.
+        List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
+                leaderActor, ApplyState.class);
+        assertEquals("ApplyState count", newLogIndex, applyStateList.size());
 
-                }
-            };
-        }};
+        for(int i = 0; i <= newLogIndex - 1; i++ ) {
+            ApplyState applyState = applyStateList.get(i);
+            assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
+            assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
+        }
+
+        ApplyState last = applyStateList.get((int) newLogIndex - 1);
+        assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
+        assertEquals("getIdentifier", "state-id", last.getIdentifier());
     }
 
     @Test
     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
-
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext(leaderActor);
-            actorContext.setPeerAddresses(peerAddresses);
-
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
-
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
-
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
-
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.setCommitIndex(followersLastIndex);
-            //set follower timeout to 2 mins, helps during debugging
-            actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
-
-            MockLeader leader = new MockLeader(actorContext);
-
-            // new entry
-            ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                    new MockRaftActorContext.MockPayload("D"));
+        logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
 
-            //update follower timestamp
-            leader.markFollowerActive(followerActor.path().toString());
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            ByteString bs = toByteString(leadersSnapshot);
-            leader.setSnapshot(Optional.of(bs));
-            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            //send first chunk and no InstallSnapshotReply received yet
-            leader.getFollowerToSnapshot().getNextChunk();
-            leader.getFollowerToSnapshot().incrementChunkIndex();
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
-            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int newEntryIndex = 4;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            leader.handleMessage(leaderActor, new SendHeartBeat());
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.setCommitIndex(followersLastIndex);
+        //set follower timeout to 2 mins, helps during debugging
+        actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
 
-            AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
-                followerActor, AppendEntries.class);
+        leader = new Leader(actorContext);
 
-            assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
-                "received", aeproto);
+        // new entry
+        ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                        new MockRaftActorContext.MockPayload("D"));
 
-            AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+        //update follower timestamp
+        leader.markFollowerActive(FOLLOWER_ID);
 
-            assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
+        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
 
-            //InstallSnapshotReply received
-            leader.getFollowerToSnapshot().markSendStatus(true);
+        //send first chunk and no InstallSnapshotReply received yet
+        fts.getNextChunk();
+        fts.incrementChunkIndex();
 
-            leader.handleMessage(senderActor, new SendHeartBeat());
+        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
-            InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
-                InstallSnapshot.SERIALIZABLE_CLASS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
-                isproto);
+        AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+        AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
 
-            assertEquals(snapshotIndex, is.getLastIncludedIndex());
+        assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
 
-        }};
-    }
+        //InstallSnapshotReply received
+        fts.markSendStatus(true);
 
-    @Test
-    public void testSendAppendEntriesSnapshotScenario() {
-        new JavaTestKit(getSystem()) {{
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            ActorRef followerActor = getTestActor();
+        InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.SERIALIZABLE_CLASS);
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+        InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
 
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext(getRef());
-            actorContext.setPeerAddresses(peerAddresses);
+        assertEquals(snapshotIndex, is.getLastIncludedIndex());
+    }
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+    @Test
+    public void testSendAppendEntriesSnapshotScenario() throws Exception {
+        logStart("testSendAppendEntriesSnapshotScenario");
 
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.setCommitIndex(followersLastIndex);
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
-            Leader leader = new Leader(actorContext);
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int newEntryIndex = 4;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            // new entry
-            ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                    new MockRaftActorContext.MockPayload("D"));
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            //update follower timestamp
-            leader.markFollowerActive(followerActor.path().toString());
+        leader = new Leader(actorContext);
 
-            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
-            RaftActorBehavior raftBehavior = leader.handleMessage(
-                senderActor, new Replicate(null, "state-id", entry));
+        // new entry
+        ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                        new MockRaftActorContext.MockPayload("D"));
 
-            assertTrue(raftBehavior instanceof Leader);
+        //update follower timestamp
+        leader.markFollowerActive(FOLLOWER_ID);
 
-            // we might receive some heartbeat messages, so wait till we get CaptureSnapshot
-            Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
-                @Override
-                protected Boolean match(Object o) throws Exception {
-                    if (o instanceof CaptureSnapshot) {
-                        return true;
-                    }
-                    return false;
-                }
-            }.get();
+        // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+        RaftActorBehavior raftBehavior = leader.handleMessage(
+                leaderActor, new Replicate(null, "state-id", entry));
 
-            boolean captureSnapshot = false;
-            for (Boolean b: matches) {
-                captureSnapshot = b | captureSnapshot;
-            }
+        assertTrue(raftBehavior instanceof Leader);
 
-            assertTrue(captureSnapshot);
-        }};
+        MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
     }
 
     @Test
     public void testInitiateInstallSnapshot() throws Exception {
-        new JavaTestKit(getSystem()) {{
+        logStart("testInitiateInstallSnapshot");
 
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            ActorRef followerActor = getTestActor();
-
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
-
-            MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(leaderActor);
-            actorContext.setPeerAddresses(peerAddresses);
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int newEntryIndex = 4;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.setLastApplied(3);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.setLastApplied(3);
-            actorContext.setCommitIndex(followersLastIndex);
+        leader = new Leader(actorContext);
 
-            Leader leader = new Leader(actorContext);
-            // set the snapshot as absent and check if capture-snapshot is invoked.
-            leader.setSnapshot(Optional.<ByteString>absent());
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            // new entry
-            ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                    new MockRaftActorContext.MockPayload("D"));
+        // set the snapshot as absent and check if capture-snapshot is invoked.
+        leader.setSnapshot(Optional.<ByteString>absent());
 
-            actorContext.getReplicatedLog().append(entry);
+        // new entry
+        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                new MockRaftActorContext.MockPayload("D"));
 
-            //update follower timestamp
-            leader.markFollowerActive(followerActor.path().toString());
+        actorContext.getReplicatedLog().append(entry);
 
-            RaftActorBehavior raftBehavior = leader.handleMessage(
-                    senderActor, new Replicate(null, "state-id", entry));
+        //update follower timestamp
+        leader.markFollowerActive(FOLLOWER_ID);
 
-            CaptureSnapshot cs = MessageCollectorActor.
-                getFirstMatching(leaderActor, CaptureSnapshot.class);
+        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-            assertNotNull(cs);
+        CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
 
-            assertTrue(cs.isInstallSnapshotInitiated());
-            assertEquals(3, cs.getLastAppliedIndex());
-            assertEquals(1, cs.getLastAppliedTerm());
-            assertEquals(4, cs.getLastIndex());
-            assertEquals(2, cs.getLastTerm());
+        assertTrue(cs.isInstallSnapshotInitiated());
+        assertEquals(3, cs.getLastAppliedIndex());
+        assertEquals(1, cs.getLastAppliedTerm());
+        assertEquals(4, cs.getLastIndex());
+        assertEquals(2, cs.getLastTerm());
 
-            // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-            leader.handleMessage(senderActor, new Replicate(null, "state-id", entry));
-            List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
-            assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+        // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        }};
+        List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
     }
 
     @Test
-    public void testInstallSnapshot() {
-        new JavaTestKit(getSystem()) {{
+    public void testInstallSnapshot() throws Exception {
+        logStart("testInstallSnapshot");
 
-            ActorRef followerActor = getTestActor();
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext();
-            actorContext.setPeerAddresses(peerAddresses);
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        actorContext.setCommitIndex(followersLastIndex);
 
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
+        leader = new Leader(actorContext);
 
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
+        // Ignore initial heartbeat.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-            actorContext.setCommitIndex(followersLastIndex);
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(toByteString(leadersSnapshot)));
 
-            Leader leader = new Leader(actorContext);
+        assertTrue(raftBehavior instanceof Leader);
 
-            // Ignore initial heartbeat.
-            expectMsgClass(duration("5 seconds"), AppendEntries.class);
+        // check if installsnapshot gets called with the correct values.
 
-            // new entry
-            ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                    new MockRaftActorContext.MockPayload("D"));
+        InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
+                MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
 
-            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
-                new SendInstallSnapshot(toByteString(leadersSnapshot)));
+        assertNotNull(installSnapshot.getData());
+        assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
+        assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
 
-            assertTrue(raftBehavior instanceof Leader);
-
-            // check if installsnapshot gets called with the correct values.
-            final String out =
-                new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                    // do not put code outside this method, will run afterwards
-                    @Override
-                    protected String match(Object in) {
-                        if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
-                            InstallSnapshot is = (InstallSnapshot)
-                                SerializationUtils.fromSerializable(in);
-                            if (is.getData() == null) {
-                                return "InstallSnapshot data is null";
-                            }
-                            if (is.getLastIncludedIndex() != snapshotIndex) {
-                                return is.getLastIncludedIndex() + "!=" + snapshotIndex;
-                            }
-                            if (is.getLastIncludedTerm() != snapshotTerm) {
-                                return is.getLastIncludedTerm() + "!=" + snapshotTerm;
-                            }
-                            if (is.getTerm() == currentTerm) {
-                                return is.getTerm() + "!=" + currentTerm;
-                            }
-
-                            return "match";
-
-                        } else {
-                            return "message mismatch:" + in.getClass();
-                        }
-                    }
-                }.get(); // this extracts the received message
-
-            assertEquals("match", out);
-        }};
+        // FIXME - we don't set the term in the serialized message.
+        //assertEquals(currentTerm, installSnapshot.getTerm());
     }
 
     @Test
-    public void testHandleInstallSnapshotReplyLastChunk() {
-        new JavaTestKit(getSystem()) {{
+    public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
+        logStart("testHandleInstallSnapshotReplyLastChunk");
 
-            ActorRef followerActor = getTestActor();
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
-
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
-
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext();
-            actorContext.setPeerAddresses(peerAddresses);
-            actorContext.setCommitIndex(followersLastIndex);
-
-            MockLeader leader = new MockLeader(actorContext);
-
-            // Ignore initial heartbeat.
-            expectMsgClass(duration("5 seconds"), AppendEntries.class);
-
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
-
-            // set the snapshot variables in replicatedlog
-
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-
-            ByteString bs = toByteString(leadersSnapshot);
-            leader.setSnapshot(Optional.of(bs));
-            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
-            while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
-                leader.getFollowerToSnapshot().getNextChunk();
-                leader.getFollowerToSnapshot().incrementChunkIndex();
-            }
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
-                new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
-                    leader.getFollowerToSnapshot().getChunkIndex(), true));
+        leader = new Leader(actorContext);
 
-            assertTrue(raftBehavior instanceof Leader);
+        // Ignore initial heartbeat.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            assertEquals(0, leader.followerSnapshotSize());
-            assertEquals(1, leader.followerLogSize());
-            assertNotNull(leader.getFollower(followerActor.path().toString()));
-            FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
-            assertEquals(snapshotIndex, fli.getMatchIndex());
-            assertEquals(snapshotIndex, fli.getMatchIndex());
-            assertEquals(snapshotIndex + 1, fli.getNextIndex());
-        }};
-    }
-    @Test
-    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
-        new JavaTestKit(getSystem()) {{
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            TestActorRef<MessageCollectorActor> followerActor =
-                TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+        // set the snapshot variables in replicatedlog
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("follower-reply",
-                followerActor.path().toString());
-
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
-
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext();
-            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
-                @Override
-                public int getSnapshotChunkSize() {
-                    return 50;
-                }
-            };
-            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
-            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
+        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        while(!fts.isLastChunk(fts.getChunkIndex())) {
+            fts.getNextChunk();
+            fts.incrementChunkIndex();
+        }
+
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
-            actorContext.setConfigParams(configParams);
-            actorContext.setPeerAddresses(peerAddresses);
-            actorContext.setCommitIndex(followersLastIndex);
+        RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
+                new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
 
-            MockLeader leader = new MockLeader(actorContext);
+        assertTrue(raftBehavior instanceof Leader);
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+        assertEquals(0, leader.followerSnapshotSize());
+        assertEquals(1, leader.followerLogSize());
+        FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
+        assertNotNull(fli);
+        assertEquals(snapshotIndex, fli.getMatchIndex());
+        assertEquals(snapshotIndex, fli.getMatchIndex());
+        assertEquals(snapshotIndex + 1, fli.getNextIndex());
+    }
+
+    @Test
+    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+        logStart("testSendSnapshotfromInstallSnapshotReply");
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            ByteString bs = toByteString(leadersSnapshot);
-            leader.setSnapshot(Optional.of(bs));
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        };
+        configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+        configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
-            List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
-                InstallSnapshotMessages.InstallSnapshot.class);
+        actorContext.setConfigParams(configParams);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            assertEquals(1, objectList.size());
+        leader = new Leader(actorContext);
 
-            Object o = objectList.get(0);
-            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
-            assertEquals(1, installSnapshot.getChunkIndex());
-            assertEquals(3, installSnapshot.getTotalChunks());
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
-                "follower-reply", installSnapshot.getChunkIndex(), true));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-            objectList = MessageCollectorActor.getAllMatching(followerActor,
-                InstallSnapshotMessages.InstallSnapshot.class);
+        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            assertEquals(2, objectList.size());
+        assertEquals(1, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
 
-            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+        followerActor.underlyingActor().clear();
+        leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
-                "follower-reply", installSnapshot.getChunkIndex(), true));
+        installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            objectList = MessageCollectorActor.getAllMatching(followerActor,
-                InstallSnapshotMessages.InstallSnapshot.class);
+        assertEquals(2, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
 
-            assertEquals(3, objectList.size());
+        followerActor.underlyingActor().clear();
+        leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
-                "follower-reply", installSnapshot.getChunkIndex(), true));
+        // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+        followerActor.underlyingActor().clear();
+        leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-            objectList = MessageCollectorActor.getAllMatching(followerActor,
-                InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.getFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            // Count should still stay at 3
-            assertEquals(3, objectList.size());
-        }};
+        Assert.assertNull(installSnapshot);
     }
 
 
     @Test
     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
-        new JavaTestKit(getSystem()) {{
-
-            TestActorRef<MessageCollectorActor> followerActor =
-                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
-
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                    followerActor.path().toString());
+        logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
 
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            MockRaftActorContext actorContext =
-                    (MockRaftActorContext) createActorContext();
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            actorContext.setConfigParams(new DefaultConfigParamsImpl(){
-                @Override
-                public int getSnapshotChunkSize() {
-                    return 50;
-                }
-            });
-            actorContext.setPeerAddresses(peerAddresses);
-            actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        });
 
-            MockLeader leader = new MockLeader(actorContext);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+        leader = new Leader(actorContext);
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            ByteString bs = toByteString(leadersSnapshot);
-            leader.setSnapshot(Optional.of(bs));
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
-            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
 
-            MessageCollectorActor.getAllMatching(followerActor,
-                    InstallSnapshotMessages.InstallSnapshot.class);
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-            InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
-                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
-            assertNotNull(installSnapshot);
+        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            assertEquals(1, installSnapshot.getChunkIndex());
-            assertEquals(3, installSnapshot.getTotalChunks());
+        assertEquals(1, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
 
-            followerActor.underlyingActor().clear();
+        followerActor.underlyingActor().clear();
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
-                followerActor.path().toString(), -1, false));
+        leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                FOLLOWER_ID, -1, false));
 
-            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
                 TimeUnit.MILLISECONDS);
 
-            leader.handleMessage(leaderActor, new SendHeartBeat());
-
-            installSnapshot = MessageCollectorActor.getFirstMatching(
-                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
-            assertNotNull(installSnapshot);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            assertEquals(1, installSnapshot.getChunkIndex());
-            assertEquals(3, installSnapshot.getTotalChunks());
+        installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            followerActor.tell(PoisonPill.getInstance(), getRef());
-        }};
+        assertEquals(1, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
     }
 
     @Test
     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<MessageCollectorActor> followerActor =
-                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
-
-                Map<String, String> peerAddresses = new HashMap<>();
-                peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+        logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
 
-                final int followersLastIndex = 2;
-                final int snapshotIndex = 3;
-                final int snapshotTerm = 1;
-                final int currentTerm = 2;
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-                MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-                actorContext.setConfigParams(new DefaultConfigParamsImpl() {
-                    @Override
-                    public int getSnapshotChunkSize() {
-                        return 50;
-                    }
-                });
-                actorContext.setPeerAddresses(peerAddresses);
-                actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        });
 
-                MockLeader leader = new MockLeader(actorContext);
+        actorContext.setCommitIndex(followersLastIndex);
 
-                Map<String, String> leadersSnapshot = new HashMap<>();
-                leadersSnapshot.put("1", "A");
-                leadersSnapshot.put("2", "B");
-                leadersSnapshot.put("3", "C");
+        leader = new Leader(actorContext);
 
-                // set the snapshot variables in replicatedlog
-                actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-                actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-                ByteString bs = toByteString(leadersSnapshot);
-                leader.setSnapshot(Optional.of(bs));
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
-                leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
 
-                InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
-                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
-                assertNotNull(installSnapshot);
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-                assertEquals(1, installSnapshot.getChunkIndex());
-                assertEquals(3, installSnapshot.getTotalChunks());
-                assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-                int hashCode = installSnapshot.getData().hashCode();
+        assertEquals(1, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
+        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
 
-                followerActor.underlyingActor().clear();
+        int hashCode = installSnapshot.getData().hashCode();
 
-                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+        followerActor.underlyingActor().clear();
 
-                installSnapshot = MessageCollectorActor.getFirstMatching(
-                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
-                assertNotNull(installSnapshot);
+        leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
+                FOLLOWER_ID, 1, true));
 
-                assertEquals(2, installSnapshot.getChunkIndex());
-                assertEquals(3, installSnapshot.getTotalChunks());
-                assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+        installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-                followerActor.tell(PoisonPill.getInstance(), getRef());
-            }};
+        assertEquals(2, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
+        assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
     }
 
     @Test
     public void testFollowerToSnapshotLogic() {
+        logStart("testFollowerToSnapshotLogic");
 
-        MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+        MockRaftActorContext actorContext = createActorContext();
 
         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
             @Override
@@ -847,7 +698,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             }
         });
 
-        MockLeader leader = new MockLeader(actorContext);
+        leader = new Leader(actorContext);
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -857,7 +708,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         ByteString bs = toByteString(leadersSnapshot);
         byte[] barray = bs.toByteArray();
 
-        leader.createFollowerToSnapshot("followerId", bs);
+        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+
         assertEquals(bs.size(), barray.length);
 
         int chunkIndex=0;
@@ -869,35 +722,46 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                 j = barray.length;
             }
 
-            ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+            ByteString chunk = fts.getNextChunk();
             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
-            assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+            assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
 
-            leader.getFollowerToSnapshot().markSendStatus(true);
-            if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
-                leader.getFollowerToSnapshot().incrementChunkIndex();
+            fts.markSendStatus(true);
+            if (!fts.isLastChunk(chunkIndex)) {
+                fts.incrementChunkIndex();
             }
         }
 
-        assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+        assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
     }
 
-
     @Override protected RaftActorBehavior createBehavior(
         RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
-    @Override protected RaftActorContext createActorContext() {
+    @Override
+    protected MockRaftActorContext createActorContext() {
         return createActorContext(leaderActor);
     }
 
     @Override
-    protected RaftActorContext createActorContext(ActorRef actorRef) {
+    protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+        return createActorContext("leader", actorRef);
+    }
+
+    private MockRaftActorContext createActorContextWithFollower() {
+        MockRaftActorContext actorContext = createActorContext();
+        actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
+                followerActor.path().toString()).build());
+        return actorContext;
+    }
+
+    private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
-        MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
+        MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
         context.setConfigParams(configParams);
         return context;
     }
@@ -945,310 +809,248 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
-                    Props.create(ForwardMessageToBehaviorActor.class));
+        logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
 
-            MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
-            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
-                    ForwardMessageToBehaviorActor.props());
+        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
-            MockRaftActorContext followerActorContext =
-                    new MockRaftActorContext("follower", getSystem(), followerActor);
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().behavior = follower;
 
-            Follower follower = new Follower(followerActorContext);
-            followerActor.underlyingActor().behavior = follower;
+        Map<String, String> peerAddresses = new HashMap<>();
+        peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("follower", followerActor.path().toString());
+        leaderActorContext.setPeerAddresses(peerAddresses);
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.getReplicatedLog().removeFrom(0);
 
-            leaderActorContext.getReplicatedLog().removeFrom(0);
+        //create 3 entries
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-            //create 3 entries
-            leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        leaderActorContext.setCommitIndex(1);
 
-            leaderActorContext.setCommitIndex(1);
+        followerActorContext.getReplicatedLog().removeFrom(0);
 
-            followerActorContext.getReplicatedLog().removeFrom(0);
+        // follower too has the exact same log entries and has the same commit index
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-            // follower too has the exact same log entries and has the same commit index
-            followerActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        followerActorContext.setCommitIndex(1);
 
-            followerActorContext.setCommitIndex(1);
+        leader = new Leader(leaderActorContext);
 
-            Leader leader = new Leader(leaderActorContext);
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(0, appendEntries.getEntries().size());
+        assertEquals(0, appendEntries.getPrevLogIndex());
 
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(0, appendEntries.getEntries().size());
-            assertEquals(0, appendEntries.getPrevLogIndex());
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+                leaderActor, AppendEntriesReply.class);
 
-            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
-                    leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
+        assertEquals(2, appendEntriesReply.getLogLastIndex());
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-            assertEquals(2, appendEntriesReply.getLogLastIndex());
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
+        // follower returns its next index
+        assertEquals(2, appendEntriesReply.getLogLastIndex());
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-            // follower returns its next index
-            assertEquals(2, appendEntriesReply.getLogLastIndex());
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
-        }};
+        follower.close();
     }
 
-
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
-                    Props.create(ForwardMessageToBehaviorActor.class));
+        logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
 
-            MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        MockRaftActorContext leaderActorContext = createActorContext();
 
-            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
-                    ForwardMessageToBehaviorActor.props());
+        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
-            MockRaftActorContext followerActorContext =
-                    new MockRaftActorContext("follower", getSystem(), followerActor);
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().behavior = follower;
 
-            Follower follower = new Follower(followerActorContext);
-            followerActor.underlyingActor().behavior = follower;
+        Map<String, String> peerAddresses = new HashMap<>();
+        peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("follower", followerActor.path().toString());
+        leaderActorContext.setPeerAddresses(peerAddresses);
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.getReplicatedLog().removeFrom(0);
 
-            leaderActorContext.getReplicatedLog().removeFrom(0);
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-            leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        leaderActorContext.setCommitIndex(1);
 
-            leaderActorContext.setCommitIndex(1);
+        followerActorContext.getReplicatedLog().removeFrom(0);
 
-            followerActorContext.getReplicatedLog().removeFrom(0);
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-            followerActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        // follower has the same log entries but its commit index > leaders commit index
+        followerActorContext.setCommitIndex(2);
 
-            // follower has the same log entries but its commit index > leaders commit index
-            followerActorContext.setCommitIndex(2);
+        leader = new Leader(leaderActorContext);
 
-            Leader leader = new Leader(leaderActorContext);
+        // Initial heartbeat
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            // Initial heartbeat
-            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(0, appendEntries.getEntries().size());
+        assertEquals(0, appendEntries.getPrevLogIndex());
 
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(0, appendEntries.getEntries().size());
-            assertEquals(0, appendEntries.getPrevLogIndex());
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+                leaderActor, AppendEntriesReply.class);
 
-            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
-                    leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
+        assertEquals(2, appendEntriesReply.getLogLastIndex());
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-            assertEquals(2, appendEntriesReply.getLogLastIndex());
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
+        leaderActor.underlyingActor().behavior = leader;
+        leader.handleMessage(followerActor, appendEntriesReply);
 
-            leaderActor.underlyingActor().behavior = leader;
-            leader.handleMessage(followerActor, appendEntriesReply);
+        leaderActor.underlyingActor().clear();
+        followerActor.underlyingActor().clear();
 
-            leaderActor.underlyingActor().clear();
-            followerActor.underlyingActor().clear();
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
-            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                    TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            leader.handleMessage(leaderActor, new SendHeartBeat());
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        assertEquals(2, appendEntries.getLeaderCommit());
+        assertEquals(0, appendEntries.getEntries().size());
+        assertEquals(2, appendEntries.getPrevLogIndex());
 
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(0, appendEntries.getEntries().size());
-            assertEquals(2, appendEntries.getPrevLogIndex());
+        appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
-            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
+        assertEquals(2, appendEntriesReply.getLogLastIndex());
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-            assertEquals(2, appendEntriesReply.getLogLastIndex());
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
+        assertEquals(2, followerActorContext.getCommitIndex());
 
-            assertEquals(1, followerActorContext.getCommitIndex());
-        }};
+        follower.close();
     }
 
     @Test
     public void testHandleAppendEntriesReplyFailure(){
-        new JavaTestKit(getSystem()) {
-            {
+        logStart("testHandleAppendEntriesReplyFailure");
 
-                ActorRef leaderActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
-                ActorRef followerActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        leader = new Leader(leaderActorContext);
 
+        // Send initial heartbeat reply with last index.
+        leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
 
-                MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
 
-                Map<String, String> peerAddresses = new HashMap<>();
-                peerAddresses.put("follower-1",
-                    followerActor.path().toString());
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
 
-                leaderActorContext.setPeerAddresses(peerAddresses);
+        RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
-                Leader leader = new Leader(leaderActorContext);
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
 
-                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
-
-                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
-
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
-
-            }};
+        assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
     }
 
     @Test
     public void testHandleAppendEntriesReplySuccess() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-
-                ActorRef leaderActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
-                ActorRef followerActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
+        logStart("testHandleAppendEntriesReplySuccess");
 
-                MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
-                leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-                Map<String, String> peerAddresses = new HashMap<>();
-                peerAddresses.put("follower-1",
-                    followerActor.path().toString());
+        leaderActorContext.setCommitIndex(1);
+        leaderActorContext.setLastApplied(1);
+        leaderActorContext.getTermInformation().update(1, "leader");
 
-                leaderActorContext.setPeerAddresses(peerAddresses);
-                leaderActorContext.setCommitIndex(1);
-                leaderActorContext.setLastApplied(1);
-                leaderActorContext.getTermInformation().update(1, "leader");
+        leader = new Leader(leaderActorContext);
 
-                Leader leader = new Leader(leaderActorContext);
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
 
-                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+        RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
-                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
 
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
+        assertEquals(2, leaderActorContext.getCommitIndex());
 
-                assertEquals(2, leaderActorContext.getCommitIndex());
+        ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
+                leaderActor, ApplyLogEntries.class);
 
-                ApplyLogEntries applyLogEntries =
-                    MessageCollectorActor.getFirstMatching(leaderActor,
-                    ApplyLogEntries.class);
+        assertEquals(2, leaderActorContext.getLastApplied());
 
-                assertNotNull(applyLogEntries);
+        assertEquals(2, applyLogEntries.getToIndex());
 
-                assertEquals(2, leaderActorContext.getLastApplied());
+        List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+                ApplyState.class);
 
-                assertEquals(2, applyLogEntries.getToIndex());
+        assertEquals(1,applyStateList.size());
 
-                List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
-                    ApplyState.class);
+        ApplyState applyState = applyStateList.get(0);
 
-                assertEquals(1,applyStateList.size());
-
-                ApplyState applyState = (ApplyState) applyStateList.get(0);
-
-                assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
-
-            }};
+        assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
     }
 
     @Test
     public void testHandleAppendEntriesReplyUnknownFollower(){
-        new JavaTestKit(getSystem()) {
-            {
+        logStart("testHandleAppendEntriesReplyUnknownFollower");
 
-                ActorRef leaderActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        MockRaftActorContext leaderActorContext = createActorContext();
 
-                MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        leader = new Leader(leaderActorContext);
 
-                Leader leader = new Leader(leaderActorContext);
+        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
 
-                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+        RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
-                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
-
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
-
-            }};
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
     }
 
     @Test
     public void testHandleRequestVoteReply(){
-        new JavaTestKit(getSystem()) {
-            {
-
-                ActorRef leaderActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        logStart("testHandleRequestVoteReply");
 
-                MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        MockRaftActorContext leaderActorContext = createActorContext();
 
-                Leader leader = new Leader(leaderActorContext);
+        leader = new Leader(leaderActorContext);
 
-                RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+        // Should be a no-op.
+        RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
+                new RequestVoteReply(1, true));
 
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
 
-                raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+        raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
 
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
-            }};
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
     }
 
     @Test
     public void testIsolatedLeaderCheckNoFollowers() {
-        new JavaTestKit(getSystem()) {{
-            ActorRef leaderActor = getTestActor();
-
-            MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+        logStart("testIsolatedLeaderCheckNoFollowers");
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            leaderActorContext.setPeerAddresses(peerAddresses);
+        MockRaftActorContext leaderActorContext = createActorContext();
 
-            Leader leader = new Leader(leaderActorContext);
-            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
-            Assert.assertTrue(behavior instanceof Leader);
-        }};
+        leader = new Leader(leaderActorContext);
+        RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+        Assert.assertTrue(behavior instanceof Leader);
     }
 
     @Test
     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+        logStart("testIsolatedLeaderCheckTwoFollowers");
+
         new JavaTestKit(getSystem()) {{
 
             ActorRef followerActor1 = getTestActor();
             ActorRef followerActor2 = getTestActor();
 
-            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            MockRaftActorContext leaderActorContext = createActorContext();
 
             Map<String, String> peerAddresses = new HashMap<>();
             peerAddresses.put("follower-1", followerActor1.path().toString());
@@ -1256,7 +1058,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
-            Leader leader = new Leader(leaderActorContext);
+            leader = new Leader(leaderActorContext);
             leader.stopIsolatedLeaderCheckSchedule();
 
             leader.markFollowerActive("follower-1");
@@ -1289,118 +1091,83 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
                 behavior instanceof IsolatedLeader);
-
         }};
     }
 
 
     @Test
     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
-                    Props.create(MessageCollectorActor.class));
-
-            MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
-
-            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-            //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
-            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
-
-            leaderActorContext.setConfigParams(configParams);
+        logStart("testAppendEntryCallAtEndofAppendEntryReply");
 
-            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
-                    ForwardMessageToBehaviorActor.props());
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
-            MockRaftActorContext followerActorContext =
-                    new MockRaftActorContext("follower-reply", getSystem(), followerActor);
-
-            followerActorContext.setConfigParams(configParams);
-
-            Follower follower = new Follower(followerActorContext);
-            followerActor.underlyingActor().behavior = follower;
-
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("follower-reply",
-                    followerActor.path().toString());
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+        configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.setConfigParams(configParams);
 
-            leaderActorContext.getReplicatedLog().removeFrom(0);
-            leaderActorContext.setCommitIndex(-1);
-            leaderActorContext.setLastApplied(-1);
+        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
-            followerActorContext.getReplicatedLog().removeFrom(0);
-            followerActorContext.setCommitIndex(-1);
-            followerActorContext.setLastApplied(-1);
+        followerActorContext.setConfigParams(configParams);
 
-            Leader leader = new Leader(leaderActorContext);
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().behavior = follower;
 
-            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
-                    leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
-            System.out.println("appendEntriesReply: "+appendEntriesReply);
-            leader.handleMessage(followerActor, appendEntriesReply);
+        leaderActorContext.getReplicatedLog().removeFrom(0);
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
 
-            // Clear initial heartbeat messages
+        followerActorContext.getReplicatedLog().removeFrom(0);
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
 
-            leaderActor.underlyingActor().clear();
-            followerActor.underlyingActor().clear();
+        leader = new Leader(leaderActorContext);
 
-            // create 3 entries
-            leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-            leaderActorContext.setCommitIndex(1);
-            leaderActorContext.setLastApplied(1);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+                leaderActor, AppendEntriesReply.class);
 
-            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                    TimeUnit.MILLISECONDS);
+        leader.handleMessage(followerActor, appendEntriesReply);
 
-            leader.handleMessage(leaderActor, new SendHeartBeat());
+        // Clear initial heartbeat messages
 
-            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        leaderActor.underlyingActor().clear();
+        followerActor.underlyingActor().clear();
 
-            // Should send first log entry
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(0, appendEntries.getEntries().get(0).getIndex());
-            assertEquals(-1, appendEntries.getPrevLogIndex());
+        // create 3 entries
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        leaderActorContext.setCommitIndex(1);
+        leaderActorContext.setLastApplied(1);
 
-            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
-            assertEquals(0, appendEntriesReply.getLogLastIndex());
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            followerActor.underlyingActor().clear();
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+        // Should send first log entry
+        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals(-1, appendEntries.getPrevLogIndex());
 
-            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
-            // Should send second log entry
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
-        }};
-    }
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
+        assertEquals(0, appendEntriesReply.getLogLastIndex());
 
-    class MockLeader extends Leader {
+        followerActor.underlyingActor().clear();
 
-        FollowerToSnapshot fts;
+        leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
 
-        public MockLeader(RaftActorContext context){
-            super(context);
-        }
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        public FollowerToSnapshot getFollowerToSnapshot() {
-            return fts;
-        }
+        // Should send second log entry
+        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(1, appendEntries.getEntries().get(0).getIndex());
 
-        public void createFollowerToSnapshot(String followerId, ByteString bs ) {
-            fts = new FollowerToSnapshot(bs);
-            setFollowerSnapshot(followerId, fts);
-        }
+        follower.close();
     }
 
     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {