Refactor LeaderTest 08/15208/3
authortpantelis <tpanteli@brocade.com>
Tue, 10 Feb 2015 13:15:07 +0000 (08:15 -0500)
committertpantelis <tpanteli@brocade.com>
Wed, 18 Feb 2015 02:47:17 +0000 (21:47 -0500)
Removed the remaining Within blocks in tests.

Changed each test to use the leaderActor and followerActor class members for
consistency. The actors are killed on tearDown. The creation of a
JavaTestKit block is no longer necessary and was removed from each test.

Added a shorcut method createActorContextWithFollower that creates the
leader context and adds a peer follower. This eliminates some duplicate code
in the tests.

Added an expectFirstMatching method to MessageCollector that waits for
the message in contrast to getFirstMatching.

Added some more assertions in some tests.

Change-Id: Id4ee0291352999b31e40abd9895e3d0237acf432
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestActorFactory.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehaviorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java

index 935c4f0bbd7495de286196a0ce65e2502cf93f95..04b9f163f4b6ad7be0f423e485902f7b3d5ccb79 100644 (file)
@@ -100,4 +100,16 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
     public long timeSinceLastActivity() {
         return stopwatch.elapsed(TimeUnit.MILLISECONDS);
     }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("FollowerLogInformationImpl [id=").append(id).append(", nextIndex=").append(nextIndex)
+                .append(", matchIndex=").append(matchIndex).append(", stopwatch=")
+                .append(stopwatch.elapsed(TimeUnit.MILLISECONDS))
+                .append(", followerTimeoutMillis=").append(followerTimeoutMillis).append("]");
+        return builder.toString();
+    }
+
+
 }
index 4d33152b41ca1a575d086a5ca04be8437e25d52b..e10fb8d2930556a0fa507d209de9ec5b98f6c947 100644 (file)
@@ -53,7 +53,7 @@ public class MockRaftActorContext implements RaftActorContext {
              * Identifier of the actor whose election term information this is
              */
             private final String id = id1;
-            private long currentTerm = 0;
+            private long currentTerm = 1;
             private String votedFor = "";
 
             @Override
@@ -88,8 +88,9 @@ public class MockRaftActorContext implements RaftActorContext {
 
     public void initReplicatedLog(){
         this.replicatedLog = new SimpleReplicatedLog();
-        this.replicatedLog.append(new MockReplicatedLogEntry(1, 0, new MockPayload("1")));
-        this.replicatedLog.append(new MockReplicatedLogEntry(1, 1, new MockPayload("2")));
+        long term = getTermInformation().getCurrentTerm();
+        this.replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
+        this.replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
     }
 
     @Override public ActorRef actorOf(Props props) {
index 59046fd779fb1319ed4d8f48bb8f6911b9eae301..a0663efdd5919bcb21cab32407aed4460536105b 100644 (file)
@@ -897,24 +897,24 @@ public class RaftActorTest extends AbstractActorTest {
             // sleeping for a minimum of 2 seconds, if it spans more its fine.
             Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
 
-            List<Object> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
+            List<RoleChanged> matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
             assertNotNull(matches);
             assertEquals(3, matches.size());
 
             // check if the notifier got a role change from null to Follower
-            RoleChanged raftRoleChanged = (RoleChanged) matches.get(0);
+            RoleChanged raftRoleChanged = matches.get(0);
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
             assertNull(raftRoleChanged.getOldRole());
             assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
 
             // check if the notifier got a role change from Follower to Candidate
-            raftRoleChanged = (RoleChanged) matches.get(1);
+            raftRoleChanged = matches.get(1);
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
             assertEquals(RaftState.Follower.name(), raftRoleChanged.getOldRole());
             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getNewRole());
 
             // check if the notifier got a role change from Candidate to Leader
-            raftRoleChanged = (RoleChanged) matches.get(2);
+            raftRoleChanged = matches.get(2);
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
index 6872c8fa4528831fda59d08ad0c3b1864054a49e..b47df13fed8c97297abf66c68bc837a03988dfb9 100644 (file)
@@ -21,6 +21,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import java.util.LinkedList;
 import java.util.List;
@@ -108,10 +109,14 @@ public class TestActorFactory implements AutoCloseable {
     }
 
     @Override
-    public void close() throws Exception {
-        for(ActorRef actor : createdActors){
-            LOG.info("Killing actor {}", actor);
-            actor.tell(PoisonPill.getInstance(), null);
-        }
+    public void close() {
+        new JavaTestKit(system) {{
+            for(ActorRef actor : createdActors) {
+                watch(actor);
+                LOG.info("Killing actor {}", actor);
+                actor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+                expectTerminated(duration("5 seconds"), actor);
+            }
+        }};
     }
 }
\ No newline at end of file
index 42a7911be31f3411c3467435bec446dfef15ecb7..17bfebb816b57049876d93cdfe19252a13bf5520 100644 (file)
@@ -1,8 +1,12 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import java.util.ArrayList;
+import java.util.List;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.AbstractActorTest;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
@@ -17,12 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
 
     private final ActorRef behaviorActor = getSystem().actorOf(Props.create(
@@ -67,8 +65,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            MockRaftActorContext context = (MockRaftActorContext)
-                createActorContext();
+            MockRaftActorContext context = createActorContext();
 
             // First set the receivers term to a high number (1000)
             context.getTermInformation().update(1000, "test");
@@ -90,6 +87,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
                 "AppendEntriesReply") {
                 // do not put code outside this method, will run afterwards
+                @Override
                 protected Boolean match(Object in) {
                     if (in instanceof AppendEntriesReply) {
                         AppendEntriesReply reply = (AppendEntriesReply) in;
@@ -112,8 +110,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {
             {
 
-                MockRaftActorContext context = (MockRaftActorContext)
-                    createActorContext();
+                MockRaftActorContext context = createActorContext();
 
                 // First set the receivers term to lower number
                 context.getTermInformation().update(2, "test");
@@ -169,6 +166,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     RaftActorBehavior behavior = createBehavior(
@@ -185,6 +183,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                             new ExpectMsg<Boolean>(duration("1 seconds"),
                                 "RequestVoteReply") {
                                 // do not put code outside this method, will run afterwards
+                                @Override
                                 protected Boolean match(Object in) {
                                     if (in instanceof RequestVoteReply) {
                                         RequestVoteReply reply =
@@ -213,6 +212,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     RaftActorContext actorContext =
@@ -238,6 +238,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                             new ExpectMsg<Boolean>(duration("1 seconds"),
                                 "RequestVoteReply") {
                                 // do not put code outside this method, will run afterwards
+                                @Override
                                 protected Boolean match(Object in) {
                                     if (in instanceof RequestVoteReply) {
                                         RequestVoteReply reply =
@@ -268,6 +269,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     RaftActorContext context =
@@ -284,6 +286,7 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
                         new ExpectMsg<Boolean>(duration("1 seconds"),
                             "RequestVoteReply") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected Boolean match(Object in) {
                                 if (in instanceof RequestVoteReply) {
                                     RequestVoteReply reply =
@@ -371,11 +374,11 @@ public abstract class AbstractRaftActorBehaviorTest extends AbstractActorTest {
         return createBehavior(createActorContext());
     }
 
-    protected RaftActorContext createActorContext() {
+    protected MockRaftActorContext createActorContext() {
         return new MockRaftActorContext();
     }
 
-    protected RaftActorContext createActorContext(ActorRef actor) {
+    protected MockRaftActorContext createActorContext(ActorRef actor) {
         return new MockRaftActorContext("test", getSystem(), actor);
     }
 
index 0dc68c2461c2235b22e663b39ad51220e96c80b5..0a715b2d04416075da5a9344ada15af6a0b688f1 100644 (file)
@@ -1,5 +1,6 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
@@ -20,8 +21,6 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 
-import static org.junit.Assert.assertEquals;
-
 public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
     private final ActorRef candidateActor = getSystem().actorOf(Props.create(
@@ -81,12 +80,14 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
+                @Override
                 protected void run() {
 
                     Candidate candidate = new Candidate(createActorContext(getTestActor()));
 
                     final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in instanceof ElectionTimeout) {
                                  return true;
@@ -117,7 +118,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testHandleElectionTimeoutWhenThereAreTwoNodesInCluster(){
         MockRaftActorContext raftActorContext =
-            (MockRaftActorContext) createActorContext();
+            createActorContext();
         raftActorContext.setPeerAddresses(onePeer);
         Candidate candidate =
             new Candidate(raftActorContext);
@@ -131,7 +132,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testBecomeLeaderOnReceivingMajorityVotesInThreeNodesInCluster(){
         MockRaftActorContext raftActorContext =
-            (MockRaftActorContext) createActorContext();
+            createActorContext();
         raftActorContext.setPeerAddresses(twoPeers);
         Candidate candidate =
             new Candidate(raftActorContext);
@@ -145,7 +146,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
     @Test
     public void testBecomeLeaderOnReceivingMajorityVotesInFiveNodesInCluster(){
         MockRaftActorContext raftActorContext =
-            (MockRaftActorContext) createActorContext();
+            createActorContext();
         raftActorContext.setPeerAddresses(fourPeers);
         Candidate candidate =
             new Candidate(raftActorContext);
@@ -164,6 +165,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     Candidate candidate = new Candidate(createActorContext(getTestActor()));
@@ -172,6 +174,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in instanceof AppendEntriesReply) {
                                 AppendEntriesReply reply = (AppendEntriesReply) in;
@@ -193,6 +196,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     Candidate candidate = new Candidate(createActorContext(getTestActor()));
@@ -201,6 +205,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "AppendEntriesResponse") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in instanceof RequestVoteReply) {
                                 RequestVoteReply reply = (RequestVoteReply) in;
@@ -222,6 +227,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     RaftActorContext context = createActorContext(getTestActor());
@@ -236,6 +242,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in instanceof RequestVoteReply) {
                                 RequestVoteReply reply = (RequestVoteReply) in;
@@ -257,6 +264,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     RaftActorContext context = createActorContext(getTestActor());
@@ -269,6 +277,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in instanceof RequestVoteReply) {
                                 RequestVoteReply reply = (RequestVoteReply) in;
@@ -291,7 +300,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest {
         return new Candidate(actorContext);
     }
 
-    @Override protected RaftActorContext createActorContext() {
+    @Override protected MockRaftActorContext createActorContext() {
         return new MockRaftActorContext("test", getSystem(), candidateActor);
     }
 
index 719a8256a0757f406777a8d03b52fab879009c74..6b0857351df132fd30b10406dae2acfe3cbd1237 100644 (file)
@@ -41,11 +41,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         return new Follower(actorContext);
     }
 
-    @Override protected  RaftActorContext createActorContext() {
+    @Override
+    protected  MockRaftActorContext createActorContext() {
         return createActorContext(followerActor);
     }
 
-    protected  RaftActorContext createActorContext(ActorRef actorRef){
+    @Override
+    protected  MockRaftActorContext createActorContext(ActorRef actorRef){
         return new MockRaftActorContext("test", getSystem(), actorRef);
     }
 
@@ -54,12 +56,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) {
+                @Override
                 protected void run() {
 
                     Follower follower = new Follower(createActorContext(getTestActor()));
 
                     final Boolean out = new ExpectMsg<Boolean>(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in instanceof ElectionTimeout) {
                                 return true;
@@ -92,6 +96,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     RaftActorContext context = createActorContext(getTestActor());
@@ -104,6 +109,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in instanceof RequestVoteReply) {
                                 RequestVoteReply reply = (RequestVoteReply) in;
@@ -125,6 +131,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         new JavaTestKit(getSystem()) {{
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     RaftActorContext context = createActorContext(getTestActor());
@@ -137,6 +144,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"), "RequestVoteReply") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in instanceof RequestVoteReply) {
                                 RequestVoteReply reply = (RequestVoteReply) in;
@@ -203,8 +211,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            MockRaftActorContext context = (MockRaftActorContext)
-                createActorContext();
+            MockRaftActorContext context = createActorContext();
 
             // First set the receivers term to lower number
             context.getTermInformation().update(95, "test");
@@ -233,6 +240,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
                 "AppendEntriesReply") {
                 // do not put code outside this method, will run afterwards
+                @Override
                 protected Boolean match(Object in) {
                     if (in instanceof AppendEntriesReply) {
                         AppendEntriesReply reply = (AppendEntriesReply) in;
@@ -263,8 +271,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     public void testHandleAppendEntriesAddNewEntries() throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            MockRaftActorContext context = (MockRaftActorContext)
-                createActorContext();
+            MockRaftActorContext context = createActorContext();
 
             // First set the receivers term to lower number
             context.getTermInformation().update(1, "test");
@@ -312,6 +319,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
                 "AppendEntriesReply") {
                 // do not put code outside this method, will run afterwards
+                @Override
                 protected Boolean match(Object in) {
                     if (in instanceof AppendEntriesReply) {
                         AppendEntriesReply reply = (AppendEntriesReply) in;
@@ -343,8 +351,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            MockRaftActorContext context = (MockRaftActorContext)
-                createActorContext();
+            MockRaftActorContext context = createActorContext();
 
             // First set the receivers term to lower number
             context.getTermInformation().update(2, "test");
@@ -405,6 +412,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
                 "AppendEntriesReply") {
                 // do not put code outside this method, will run afterwards
+                @Override
                 protected Boolean match(Object in) {
                     if (in instanceof AppendEntriesReply) {
                         AppendEntriesReply reply = (AppendEntriesReply) in;
@@ -425,8 +433,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     public void testHandleAppendEntriesPreviousLogEntryMissing(){
         new JavaTestKit(getSystem()) {{
 
-            MockRaftActorContext context = (MockRaftActorContext)
-                    createActorContext();
+            MockRaftActorContext context = createActorContext();
 
             // Prepare the receivers log
             MockRaftActorContext.SimpleReplicatedLog log =
@@ -462,6 +469,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
                     "AppendEntriesReply") {
                 // do not put code outside this method, will run afterwards
+                @Override
                 protected Boolean match(Object in) {
                     if (in instanceof AppendEntriesReply) {
                         AppendEntriesReply reply = (AppendEntriesReply) in;
@@ -482,8 +490,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     public void testHandleAppendAfterInstallingSnapshot(){
         new JavaTestKit(getSystem()) {{
 
-            MockRaftActorContext context = (MockRaftActorContext)
-                    createActorContext();
+            MockRaftActorContext context = createActorContext();
 
 
             // Prepare the receivers log
@@ -518,6 +525,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
                     "AppendEntriesReply") {
                 // do not put code outside this method, will run afterwards
+                @Override
                 protected Boolean match(Object in) {
                     if (in instanceof AppendEntriesReply) {
                         AppendEntriesReply reply = (AppendEntriesReply) in;
@@ -548,8 +556,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             ActorRef leaderActor = getSystem().actorOf(Props.create(
                 MessageCollectorActor.class));
 
-            MockRaftActorContext context = (MockRaftActorContext)
-                createActorContext(getRef());
+            MockRaftActorContext context = createActorContext(getRef());
 
             Follower follower = (Follower)createBehavior(context);
 
@@ -641,8 +648,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 ActorRef leaderActor = getSystem().actorOf(Props.create(
                         MessageCollectorActor.class));
 
-                MockRaftActorContext context = (MockRaftActorContext)
-                        createActorContext(getRef());
+                MockRaftActorContext context = createActorContext(getRef());
 
                 Follower follower = (Follower) createBehavior(context);
 
index 708068a78952db5ba287501c1a534a31721556ad..6197f980ea51dbcbf38c424a13b4e9259240d4a7 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
@@ -18,15 +20,13 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class IsolatedLeaderTest  extends AbstractRaftActorBehaviorTest {
 
-    private ActorRef leaderActor =
+    private final ActorRef leaderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
 
-    private ActorRef senderActor =
+    private final ActorRef senderActor =
         getSystem().actorOf(Props.create(DoNothingActor.class));
 
     @Override
@@ -36,7 +36,7 @@ public class IsolatedLeaderTest  extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected RaftActorContext createActorContext() {
+    protected MockRaftActorContext createActorContext() {
         return createActorContext(leaderActor);
     }
 
@@ -47,7 +47,7 @@ public class IsolatedLeaderTest  extends AbstractRaftActorBehaviorTest {
             String followerAddress1 = "akka://test/user/$a";
             String followerAddress2 = "akka://test/user/$b";
 
-            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            MockRaftActorContext leaderActorContext = createActorContext();
             Map<String, String> peerAddresses = new HashMap<>();
             peerAddresses.put("follower-1", followerAddress1);
             peerAddresses.put("follower-2", followerAddress2);
@@ -80,7 +80,7 @@ public class IsolatedLeaderTest  extends AbstractRaftActorBehaviorTest {
             String followerAddress3 = "akka://test/user/$c";
             String followerAddress4 = "akka://test/user/$d";
 
-            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            MockRaftActorContext leaderActorContext = createActorContext();
             Map<String, String> peerAddresses = new HashMap<>();
             peerAddresses.put("follower-1", followerAddress1);
             peerAddresses.put("follower-2", followerAddress2);
@@ -118,7 +118,7 @@ public class IsolatedLeaderTest  extends AbstractRaftActorBehaviorTest {
             String followerAddress1 = "akka://test/user/$a";
             String followerAddress2 = "akka://test/user/$b";
 
-            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            MockRaftActorContext leaderActorContext = createActorContext();
             Map<String, String> peerAddresses = new HashMap<>();
             peerAddresses.put("follower-1", followerAddress1);
             peerAddresses.put("follower-2", followerAddress2);
index 119b958799e815d21b22c18dae11e9ff7415d928..f087793674ec7f304192ec55871379f1adbfc03b 100644 (file)
@@ -10,6 +10,7 @@ import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.io.ByteArrayOutputStream;
@@ -19,6 +20,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
@@ -28,6 +30,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -35,810 +38,658 @@ import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderChec
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
-    private final ActorRef leaderActor =
-        getSystem().actorOf(Props.create(DoNothingActor.class));
-    private final ActorRef senderActor =
-        getSystem().actorOf(Props.create(DoNothingActor.class));
+    static final String FOLLOWER_ID = "follower";
 
-    @Test
-    public void testHandleMessageForUnknownMessage() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            Leader leader =
-                new Leader(createActorContext());
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
 
-            // handle message should return the Leader state when it receives an
-            // unknown message
-            RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
-            Assert.assertTrue(behavior instanceof Leader);
-        }};
+    private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
+            Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
+
+    private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
+            Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
+
+    private Leader leader;
+
+    @After
+    public void tearDown() throws Exception {
+        if(leader != null) {
+            leader.close();
+        }
+
+        actorFactory.close();
+    }
+
+    private void logStart(String name) {
+        LoggerFactory.getLogger(LeaderTest.class).info("Starting " + name);
     }
 
     @Test
-    public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
-        new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
+    public void testHandleMessageForUnknownMessage() throws Exception {
+        logStart("testHandleMessageForUnknownMessage");
+
+        leader = new Leader(createActorContext());
 
-                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+        // handle message should return the Leader state when it receives an
+        // unknown message
+        RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
+        Assert.assertTrue(behavior instanceof Leader);
+    }
 
-                    Map<String, String> peerAddresses = new HashMap<>();
+    @Test
+    public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
+        logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
-                    String followerId = "follower";
-                    peerAddresses.put(followerId, followerActor.path().toString());
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-                    actorContext.setPeerAddresses(peerAddresses);
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
 
-                    long term = 1;
-                    actorContext.getTermInformation().update(term, "");
+        leader = new Leader(actorContext);
 
-                    Leader leader = new Leader(actorContext);
+        // Leader should send an immediate heartbeat with no entries as follower is inactive.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getTerm", term, appendEntries.getTerm());
+        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 0, appendEntries.getEntries().size());
 
-                    // Leader should send an immediate heartbeat with no entries as follower is inactive.
-                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
-                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
-                    assertEquals("getTerm", term, appendEntries.getTerm());
-                    assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
-                    assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
-                    assertEquals("Entries size", 0, appendEntries.getEntries().size());
+        // The follower would normally reply - simulate that explicitly here.
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex - 1, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
-                    // The follower would normally reply - simulate that explicitly here.
-                    leader.handleMessage(followerActor, new AppendEntriesReply(
-                            followerId, term, true, lastIndex - 1, term));
-                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+        followerActor.underlyingActor().clear();
 
-                    // Sleep for the heartbeat interval so AppendEntries is sent.
-                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
-                            getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+        // Sleep for the heartbeat interval so AppendEntries is sent.
+        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+                getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
 
-                    leader.handleMessage(senderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-                    appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
-                    assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
-                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
-                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
-                    assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
-                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
-                }
-            };
-        }};
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
     }
 
     @Test
-    public void testHandleReplicateMessageSendAppendEntriesToFollower() {
-        new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-                    ActorRef followerActor = getTestActor();
-
-                    MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
-
-                    Map<String, String> peerAddresses = new HashMap<>();
+    public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
+        logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
 
-                    String followerId = "follower";
-                    peerAddresses.put(followerId, followerActor.path().toString());
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-                    actorContext.setPeerAddresses(peerAddresses);
+        long term = 1;
+        actorContext.getTermInformation().update(term, "");
 
-                    long term = 1;
-                    actorContext.getTermInformation().update(term, "");
+        leader = new Leader(actorContext);
 
-                    Leader leader = new Leader(actorContext);
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-                    // Leader will send an immediate heartbeat - ignore it.
-                    expectMsgClass(duration("5 seconds"), AppendEntries.class);
+        // The follower would normally reply - simulate that explicitly here.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, term, true, lastIndex, term));
+        assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
 
-                    // The follower would normally reply - simulate that explicitly here.
-                    long lastIndex = actorContext.getReplicatedLog().lastIndex();
-                    leader.handleMessage(followerActor, new AppendEntriesReply(
-                            followerId, term, true, lastIndex, term));
-                    assertEquals("isFollowerActive", true, leader.getFollower(followerId).isFollowerActive());
+        followerActor.underlyingActor().clear();
 
-                    MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
-                    MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                            1, lastIndex + 1, payload);
-                    actorContext.getReplicatedLog().append(newEntry);
-                    RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
-                            new Replicate(null, null, newEntry));
+        MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                1, lastIndex + 1, payload);
+        actorContext.getReplicatedLog().append(newEntry);
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new Replicate(null, null, newEntry));
 
-                    // State should not change
-                    assertTrue(raftBehavior instanceof Leader);
+        // State should not change
+        assertTrue(raftBehavior instanceof Leader);
 
-                    AppendEntries appendEntries = expectMsgClass(duration("5 seconds"), AppendEntries.class);
-                    assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
-                    assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
-                    assertEquals("Entries size", 1, appendEntries.getEntries().size());
-                    assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
-                    assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
-                    assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
-                }
-            };
-        }};
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData());
     }
 
     @Test
-    public void testHandleReplicateMessageWhenThereAreNoFollowers() {
-        new JavaTestKit(getSystem()) {{
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
+    public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
+        logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
 
-                    ActorRef raftActor = getTestActor();
+        MockRaftActorContext actorContext = createActorContext();
 
-                    MockRaftActorContext actorContext =
-                        new MockRaftActorContext("test", getSystem(), raftActor);
+        leader = new Leader(actorContext);
 
-                    actorContext.getReplicatedLog().removeFrom(0);
+        actorContext.setLastApplied(0);
 
-                    actorContext.setReplicatedLog(
-                        new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
-                            .build());
+        long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
+        long term = actorContext.getTermInformation().getCurrentTerm();
+        MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+                term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
 
-                    Leader leader = new Leader(actorContext);
-                    RaftActorBehavior raftBehavior = leader
-                        .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
+        actorContext.getReplicatedLog().append(newEntry);
 
-                    // State should not change
-                    assertTrue(raftBehavior instanceof Leader);
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new Replicate(leaderActor, "state-id", newEntry));
 
-                    assertEquals(1, actorContext.getCommitIndex());
+        // State should not change
+        assertTrue(raftBehavior instanceof Leader);
 
-                    final String out =
-                        new ExpectMsg<String>(duration("1 seconds"),
-                            "match hint") {
-                            // do not put code outside this method, will run afterwards
-                            @Override
-                            protected String match(Object in) {
-                                if (in instanceof ApplyState) {
-                                    if (((ApplyState) in).getIdentifier().equals("state-id")) {
-                                        return "match";
-                                    }
-                                    return null;
-                                } else {
-                                    throw noMatch();
-                                }
-                            }
-                        }.get(); // this extracts the received message
+        assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
 
-                    assertEquals("match", out);
+        // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
+        // one since lastApplied state is 0.
+        List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
+                leaderActor, ApplyState.class);
+        assertEquals("ApplyState count", newLogIndex, applyStateList.size());
 
-                }
-            };
-        }};
+        for(int i = 0; i <= newLogIndex - 1; i++ ) {
+            ApplyState applyState = applyStateList.get(i);
+            assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
+            assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
+        }
+
+        ApplyState last = applyStateList.get((int) newLogIndex - 1);
+        assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
+        assertEquals("getIdentifier", "state-id", last.getIdentifier());
     }
 
     @Test
     public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
-
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext(leaderActor);
-            actorContext.setPeerAddresses(peerAddresses);
-
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
-
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
-
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
-
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.setCommitIndex(followersLastIndex);
-            //set follower timeout to 2 mins, helps during debugging
-            actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
-
-            MockLeader leader = new MockLeader(actorContext);
-
-            // new entry
-            ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                    new MockRaftActorContext.MockPayload("D"));
+        logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
 
-            //update follower timestamp
-            leader.markFollowerActive(followerActor.path().toString());
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            ByteString bs = toByteString(leadersSnapshot);
-            leader.setSnapshot(Optional.of(bs));
-            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            //send first chunk and no InstallSnapshotReply received yet
-            leader.getFollowerToSnapshot().getNextChunk();
-            leader.getFollowerToSnapshot().incrementChunkIndex();
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
-            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int newEntryIndex = 4;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            leader.handleMessage(leaderActor, new SendHeartBeat());
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.setCommitIndex(followersLastIndex);
+        //set follower timeout to 2 mins, helps during debugging
+        actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
 
-            AppendEntries aeproto = MessageCollectorActor.getFirstMatching(
-                followerActor, AppendEntries.class);
+        leader = new Leader(actorContext);
 
-            assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
-                "received", aeproto);
+        // new entry
+        ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                        new MockRaftActorContext.MockPayload("D"));
 
-            AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+        //update follower timestamp
+        leader.markFollowerActive(FOLLOWER_ID);
 
-            assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
+        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
 
-            //InstallSnapshotReply received
-            leader.getFollowerToSnapshot().markSendStatus(true);
+        //send first chunk and no InstallSnapshotReply received yet
+        fts.getNextChunk();
+        fts.incrementChunkIndex();
 
-            leader.handleMessage(senderActor, new SendHeartBeat());
+        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
-            InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.getFirstMatching(followerActor,
-                InstallSnapshot.SERIALIZABLE_CLASS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
-                isproto);
+        AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+        AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
 
-            assertEquals(snapshotIndex, is.getLastIncludedIndex());
+        assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
 
-        }};
-    }
+        //InstallSnapshotReply received
+        fts.markSendStatus(true);
 
-    @Test
-    public void testSendAppendEntriesSnapshotScenario() {
-        new JavaTestKit(getSystem()) {{
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            ActorRef followerActor = getTestActor();
+        InstallSnapshotMessages.InstallSnapshot isproto = MessageCollectorActor.expectFirstMatching(followerActor,
+                InstallSnapshot.SERIALIZABLE_CLASS);
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+        InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
 
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext(getRef());
-            actorContext.setPeerAddresses(peerAddresses);
+        assertEquals(snapshotIndex, is.getLastIncludedIndex());
+    }
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+    @Test
+    public void testSendAppendEntriesSnapshotScenario() throws Exception {
+        logStart("testSendAppendEntriesSnapshotScenario");
 
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.setCommitIndex(followersLastIndex);
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
-            Leader leader = new Leader(actorContext);
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int newEntryIndex = 4;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            // new entry
-            ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                    new MockRaftActorContext.MockPayload("D"));
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            //update follower timestamp
-            leader.markFollowerActive(followerActor.path().toString());
+        leader = new Leader(actorContext);
 
-            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                TimeUnit.MILLISECONDS);
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
-            RaftActorBehavior raftBehavior = leader.handleMessage(
-                senderActor, new Replicate(null, "state-id", entry));
+        // new entry
+        ReplicatedLogImplEntry entry =
+                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                        new MockRaftActorContext.MockPayload("D"));
 
-            assertTrue(raftBehavior instanceof Leader);
+        //update follower timestamp
+        leader.markFollowerActive(FOLLOWER_ID);
 
-            // we might receive some heartbeat messages, so wait till we get CaptureSnapshot
-            Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
-                @Override
-                protected Boolean match(Object o) throws Exception {
-                    if (o instanceof CaptureSnapshot) {
-                        return true;
-                    }
-                    return false;
-                }
-            }.get();
+        // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+        RaftActorBehavior raftBehavior = leader.handleMessage(
+                leaderActor, new Replicate(null, "state-id", entry));
 
-            boolean captureSnapshot = false;
-            for (Boolean b: matches) {
-                captureSnapshot = b | captureSnapshot;
-            }
+        assertTrue(raftBehavior instanceof Leader);
 
-            assertTrue(captureSnapshot);
-        }};
+        MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
     }
 
     @Test
     public void testInitiateInstallSnapshot() throws Exception {
-        new JavaTestKit(getSystem()) {{
+        logStart("testInitiateInstallSnapshot");
 
-            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            ActorRef followerActor = getTestActor();
-
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(), followerActor.path().toString());
-
-            MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext(leaderActor);
-            actorContext.setPeerAddresses(peerAddresses);
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int newEntryIndex = 4;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.setLastApplied(3);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.setLastApplied(3);
-            actorContext.setCommitIndex(followersLastIndex);
+        leader = new Leader(actorContext);
 
-            Leader leader = new Leader(actorContext);
-            // set the snapshot as absent and check if capture-snapshot is invoked.
-            leader.setSnapshot(Optional.<ByteString>absent());
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            // new entry
-            ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                    new MockRaftActorContext.MockPayload("D"));
+        // set the snapshot as absent and check if capture-snapshot is invoked.
+        leader.setSnapshot(Optional.<ByteString>absent());
 
-            actorContext.getReplicatedLog().append(entry);
+        // new entry
+        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                new MockRaftActorContext.MockPayload("D"));
 
-            //update follower timestamp
-            leader.markFollowerActive(followerActor.path().toString());
+        actorContext.getReplicatedLog().append(entry);
 
-            RaftActorBehavior raftBehavior = leader.handleMessage(
-                    senderActor, new Replicate(null, "state-id", entry));
+        //update follower timestamp
+        leader.markFollowerActive(FOLLOWER_ID);
 
-            CaptureSnapshot cs = MessageCollectorActor.
-                getFirstMatching(leaderActor, CaptureSnapshot.class);
+        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-            assertNotNull(cs);
+        CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class);
 
-            assertTrue(cs.isInstallSnapshotInitiated());
-            assertEquals(3, cs.getLastAppliedIndex());
-            assertEquals(1, cs.getLastAppliedTerm());
-            assertEquals(4, cs.getLastIndex());
-            assertEquals(2, cs.getLastTerm());
+        assertTrue(cs.isInstallSnapshotInitiated());
+        assertEquals(3, cs.getLastAppliedIndex());
+        assertEquals(1, cs.getLastAppliedTerm());
+        assertEquals(4, cs.getLastIndex());
+        assertEquals(2, cs.getLastTerm());
 
-            // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-            leader.handleMessage(senderActor, new Replicate(null, "state-id", entry));
-            List<Object> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
-            assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
+        // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
 
-        }};
+        List<CaptureSnapshot> captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class);
+        assertEquals("CaptureSnapshot should not get invoked when  initiate is in progress", 1, captureSnapshots.size());
     }
 
     @Test
-    public void testInstallSnapshot() {
-        new JavaTestKit(getSystem()) {{
+    public void testInstallSnapshot() throws Exception {
+        logStart("testInstallSnapshot");
 
-            ActorRef followerActor = getTestActor();
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext();
-            actorContext.setPeerAddresses(peerAddresses);
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        actorContext.setCommitIndex(followersLastIndex);
 
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
+        leader = new Leader(actorContext);
 
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
+        // Ignore initial heartbeat.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-            actorContext.setCommitIndex(followersLastIndex);
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+                new SendInstallSnapshot(toByteString(leadersSnapshot)));
 
-            Leader leader = new Leader(actorContext);
+        assertTrue(raftBehavior instanceof Leader);
 
-            // Ignore initial heartbeat.
-            expectMsgClass(duration("5 seconds"), AppendEntries.class);
+        // check if installsnapshot gets called with the correct values.
 
-            // new entry
-            ReplicatedLogImplEntry entry =
-                new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
-                    new MockRaftActorContext.MockPayload("D"));
+        InstallSnapshot installSnapshot = (InstallSnapshot) SerializationUtils.fromSerializable(
+                MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshotMessages.InstallSnapshot.class));
 
-            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
-                new SendInstallSnapshot(toByteString(leadersSnapshot)));
+        assertNotNull(installSnapshot.getData());
+        assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
+        assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
 
-            assertTrue(raftBehavior instanceof Leader);
-
-            // check if installsnapshot gets called with the correct values.
-            final String out =
-                new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-                    // do not put code outside this method, will run afterwards
-                    @Override
-                    protected String match(Object in) {
-                        if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
-                            InstallSnapshot is = (InstallSnapshot)
-                                SerializationUtils.fromSerializable(in);
-                            if (is.getData() == null) {
-                                return "InstallSnapshot data is null";
-                            }
-                            if (is.getLastIncludedIndex() != snapshotIndex) {
-                                return is.getLastIncludedIndex() + "!=" + snapshotIndex;
-                            }
-                            if (is.getLastIncludedTerm() != snapshotTerm) {
-                                return is.getLastIncludedTerm() + "!=" + snapshotTerm;
-                            }
-                            if (is.getTerm() == currentTerm) {
-                                return is.getTerm() + "!=" + currentTerm;
-                            }
-
-                            return "match";
-
-                        } else {
-                            return "message mismatch:" + in.getClass();
-                        }
-                    }
-                }.get(); // this extracts the received message
-
-            assertEquals("match", out);
-        }};
+        // FIXME - we don't set the term in the serialized message.
+        //assertEquals(currentTerm, installSnapshot.getTerm());
     }
 
     @Test
-    public void testHandleInstallSnapshotReplyLastChunk() {
-        new JavaTestKit(getSystem()) {{
+    public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
+        logStart("testHandleInstallSnapshotReplyLastChunk");
 
-            ActorRef followerActor = getTestActor();
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                followerActor.path().toString());
-
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int newEntryIndex = 4;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
-
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext();
-            actorContext.setPeerAddresses(peerAddresses);
-            actorContext.setCommitIndex(followersLastIndex);
-
-            MockLeader leader = new MockLeader(actorContext);
-
-            // Ignore initial heartbeat.
-            expectMsgClass(duration("5 seconds"), AppendEntries.class);
-
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
-
-            // set the snapshot variables in replicatedlog
-
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-
-            ByteString bs = toByteString(leadersSnapshot);
-            leader.setSnapshot(Optional.of(bs));
-            leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
-            while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
-                leader.getFollowerToSnapshot().getNextChunk();
-                leader.getFollowerToSnapshot().incrementChunkIndex();
-            }
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            //clears leaders log
-            actorContext.getReplicatedLog().removeFrom(0);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
-                new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
-                    leader.getFollowerToSnapshot().getChunkIndex(), true));
+        leader = new Leader(actorContext);
 
-            assertTrue(raftBehavior instanceof Leader);
+        // Ignore initial heartbeat.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            assertEquals(0, leader.followerSnapshotSize());
-            assertEquals(1, leader.followerLogSize());
-            assertNotNull(leader.getFollower(followerActor.path().toString()));
-            FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
-            assertEquals(snapshotIndex, fli.getMatchIndex());
-            assertEquals(snapshotIndex, fli.getMatchIndex());
-            assertEquals(snapshotIndex + 1, fli.getNextIndex());
-        }};
-    }
-    @Test
-    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
-        new JavaTestKit(getSystem()) {{
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            TestActorRef<MessageCollectorActor> followerActor =
-                TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+        // set the snapshot variables in replicatedlog
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("follower-reply",
-                followerActor.path().toString());
-
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
-
-            MockRaftActorContext actorContext =
-                (MockRaftActorContext) createActorContext();
-            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
-                @Override
-                public int getSnapshotChunkSize() {
-                    return 50;
-                }
-            };
-            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
-            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
+        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        while(!fts.isLastChunk(fts.getChunkIndex())) {
+            fts.getNextChunk();
+            fts.incrementChunkIndex();
+        }
+
+        //clears leaders log
+        actorContext.getReplicatedLog().removeFrom(0);
 
-            actorContext.setConfigParams(configParams);
-            actorContext.setPeerAddresses(peerAddresses);
-            actorContext.setCommitIndex(followersLastIndex);
+        RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
+                new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
 
-            MockLeader leader = new MockLeader(actorContext);
+        assertTrue(raftBehavior instanceof Leader);
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+        assertEquals(0, leader.followerSnapshotSize());
+        assertEquals(1, leader.followerLogSize());
+        FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
+        assertNotNull(fli);
+        assertEquals(snapshotIndex, fli.getMatchIndex());
+        assertEquals(snapshotIndex, fli.getMatchIndex());
+        assertEquals(snapshotIndex + 1, fli.getNextIndex());
+    }
+
+    @Test
+    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+        logStart("testSendSnapshotfromInstallSnapshotReply");
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            ByteString bs = toByteString(leadersSnapshot);
-            leader.setSnapshot(Optional.of(bs));
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        };
+        configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+        configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
-            List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
-                InstallSnapshotMessages.InstallSnapshot.class);
+        actorContext.setConfigParams(configParams);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            assertEquals(1, objectList.size());
+        leader = new Leader(actorContext);
 
-            Object o = objectList.get(0);
-            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
-            assertEquals(1, installSnapshot.getChunkIndex());
-            assertEquals(3, installSnapshot.getTotalChunks());
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
-                "follower-reply", installSnapshot.getChunkIndex(), true));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-            objectList = MessageCollectorActor.getAllMatching(followerActor,
-                InstallSnapshotMessages.InstallSnapshot.class);
+        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            assertEquals(2, objectList.size());
+        assertEquals(1, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
 
-            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+        followerActor.underlyingActor().clear();
+        leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
-                "follower-reply", installSnapshot.getChunkIndex(), true));
+        installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            objectList = MessageCollectorActor.getAllMatching(followerActor,
-                InstallSnapshotMessages.InstallSnapshot.class);
+        assertEquals(2, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
 
-            assertEquals(3, objectList.size());
+        followerActor.underlyingActor().clear();
+        leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+        installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
-                "follower-reply", installSnapshot.getChunkIndex(), true));
+        // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+        followerActor.underlyingActor().clear();
+        leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
 
-            objectList = MessageCollectorActor.getAllMatching(followerActor,
-                InstallSnapshotMessages.InstallSnapshot.class);
+        installSnapshot = MessageCollectorActor.getFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            // Count should still stay at 3
-            assertEquals(3, objectList.size());
-        }};
+        Assert.assertNull(installSnapshot);
     }
 
 
     @Test
     public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
-        new JavaTestKit(getSystem()) {{
-
-            TestActorRef<MessageCollectorActor> followerActor =
-                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
-
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put(followerActor.path().toString(),
-                    followerActor.path().toString());
+        logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
 
-            final int followersLastIndex = 2;
-            final int snapshotIndex = 3;
-            final int snapshotTerm = 1;
-            final int currentTerm = 2;
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-            MockRaftActorContext actorContext =
-                    (MockRaftActorContext) createActorContext();
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-            actorContext.setConfigParams(new DefaultConfigParamsImpl(){
-                @Override
-                public int getSnapshotChunkSize() {
-                    return 50;
-                }
-            });
-            actorContext.setPeerAddresses(peerAddresses);
-            actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        });
 
-            MockLeader leader = new MockLeader(actorContext);
+        actorContext.setCommitIndex(followersLastIndex);
 
-            Map<String, String> leadersSnapshot = new HashMap<>();
-            leadersSnapshot.put("1", "A");
-            leadersSnapshot.put("2", "B");
-            leadersSnapshot.put("3", "C");
+        leader = new Leader(actorContext);
 
-            // set the snapshot variables in replicatedlog
-            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-            ByteString bs = toByteString(leadersSnapshot);
-            leader.setSnapshot(Optional.of(bs));
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
-            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
 
-            MessageCollectorActor.getAllMatching(followerActor,
-                    InstallSnapshotMessages.InstallSnapshot.class);
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-            InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
-                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
-            assertNotNull(installSnapshot);
+        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            assertEquals(1, installSnapshot.getChunkIndex());
-            assertEquals(3, installSnapshot.getTotalChunks());
+        assertEquals(1, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
 
-            followerActor.underlyingActor().clear();
+        followerActor.underlyingActor().clear();
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
-                followerActor.path().toString(), -1, false));
+        leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                FOLLOWER_ID, -1, false));
 
-            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+        Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
                 TimeUnit.MILLISECONDS);
 
-            leader.handleMessage(leaderActor, new SendHeartBeat());
-
-            installSnapshot = MessageCollectorActor.getFirstMatching(
-                    followerActor, InstallSnapshotMessages.InstallSnapshot.class);
-            assertNotNull(installSnapshot);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            assertEquals(1, installSnapshot.getChunkIndex());
-            assertEquals(3, installSnapshot.getTotalChunks());
+        installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-            followerActor.tell(PoisonPill.getInstance(), getRef());
-        }};
+        assertEquals(1, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
     }
 
     @Test
     public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-                TestActorRef<MessageCollectorActor> followerActor =
-                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
-
-                Map<String, String> peerAddresses = new HashMap<>();
-                peerAddresses.put(followerActor.path().toString(),
-                        followerActor.path().toString());
+        logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
 
-                final int followersLastIndex = 2;
-                final int snapshotIndex = 3;
-                final int snapshotTerm = 1;
-                final int currentTerm = 2;
+        MockRaftActorContext actorContext = createActorContextWithFollower();
 
-                MockRaftActorContext actorContext =
-                        (MockRaftActorContext) createActorContext();
+        final int followersLastIndex = 2;
+        final int snapshotIndex = 3;
+        final int snapshotTerm = 1;
+        final int currentTerm = 2;
 
-                actorContext.setConfigParams(new DefaultConfigParamsImpl() {
-                    @Override
-                    public int getSnapshotChunkSize() {
-                        return 50;
-                    }
-                });
-                actorContext.setPeerAddresses(peerAddresses);
-                actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+            @Override
+            public int getSnapshotChunkSize() {
+                return 50;
+            }
+        });
 
-                MockLeader leader = new MockLeader(actorContext);
+        actorContext.setCommitIndex(followersLastIndex);
 
-                Map<String, String> leadersSnapshot = new HashMap<>();
-                leadersSnapshot.put("1", "A");
-                leadersSnapshot.put("2", "B");
-                leadersSnapshot.put("3", "C");
+        leader = new Leader(actorContext);
 
-                // set the snapshot variables in replicatedlog
-                actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
-                actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-                actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
 
-                ByteString bs = toByteString(leadersSnapshot);
-                leader.setSnapshot(Optional.of(bs));
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
-                leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        ByteString bs = toByteString(leadersSnapshot);
+        leader.setSnapshot(Optional.of(bs));
 
-                InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.getFirstMatching(
-                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
-                assertNotNull(installSnapshot);
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
 
-                assertEquals(1, installSnapshot.getChunkIndex());
-                assertEquals(3, installSnapshot.getTotalChunks());
-                assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
+        InstallSnapshotMessages.InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-                int hashCode = installSnapshot.getData().hashCode();
+        assertEquals(1, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
+        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode());
 
-                followerActor.underlyingActor().clear();
+        int hashCode = installSnapshot.getData().hashCode();
 
-                leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
+        followerActor.underlyingActor().clear();
 
-                installSnapshot = MessageCollectorActor.getFirstMatching(
-                        followerActor, InstallSnapshotMessages.InstallSnapshot.class);
-                assertNotNull(installSnapshot);
+        leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
+                FOLLOWER_ID, 1, true));
 
-                assertEquals(2, installSnapshot.getChunkIndex());
-                assertEquals(3, installSnapshot.getTotalChunks());
-                assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
+        installSnapshot = MessageCollectorActor.expectFirstMatching(
+                followerActor, InstallSnapshotMessages.InstallSnapshot.class);
 
-                followerActor.tell(PoisonPill.getInstance(), getRef());
-            }};
+        assertEquals(2, installSnapshot.getChunkIndex());
+        assertEquals(3, installSnapshot.getTotalChunks());
+        assertEquals(hashCode, installSnapshot.getLastChunkHashCode());
     }
 
     @Test
     public void testFollowerToSnapshotLogic() {
+        logStart("testFollowerToSnapshotLogic");
 
-        MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+        MockRaftActorContext actorContext = createActorContext();
 
         actorContext.setConfigParams(new DefaultConfigParamsImpl() {
             @Override
@@ -847,7 +698,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             }
         });
 
-        MockLeader leader = new MockLeader(actorContext);
+        leader = new Leader(actorContext);
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -857,7 +708,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         ByteString bs = toByteString(leadersSnapshot);
         byte[] barray = bs.toByteArray();
 
-        leader.createFollowerToSnapshot("followerId", bs);
+        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+
         assertEquals(bs.size(), barray.length);
 
         int chunkIndex=0;
@@ -869,35 +722,46 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                 j = barray.length;
             }
 
-            ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
+            ByteString chunk = fts.getNextChunk();
             assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
-            assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+            assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
 
-            leader.getFollowerToSnapshot().markSendStatus(true);
-            if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
-                leader.getFollowerToSnapshot().incrementChunkIndex();
+            fts.markSendStatus(true);
+            if (!fts.isLastChunk(chunkIndex)) {
+                fts.incrementChunkIndex();
             }
         }
 
-        assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
+        assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
     }
 
-
     @Override protected RaftActorBehavior createBehavior(
         RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
-    @Override protected RaftActorContext createActorContext() {
+    @Override
+    protected MockRaftActorContext createActorContext() {
         return createActorContext(leaderActor);
     }
 
     @Override
-    protected RaftActorContext createActorContext(ActorRef actorRef) {
+    protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+        return createActorContext("leader", actorRef);
+    }
+
+    private MockRaftActorContext createActorContextWithFollower() {
+        MockRaftActorContext actorContext = createActorContext();
+        actorContext.setPeerAddresses(ImmutableMap.<String,String>builder().put(FOLLOWER_ID,
+                followerActor.path().toString()).build());
+        return actorContext;
+    }
+
+    private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
-        MockRaftActorContext context = new MockRaftActorContext("test", getSystem(), actorRef);
+        MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
         context.setConfigParams(configParams);
         return context;
     }
@@ -945,310 +809,248 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
-                    Props.create(ForwardMessageToBehaviorActor.class));
+        logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
 
-            MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
-            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
-                    ForwardMessageToBehaviorActor.props());
+        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
-            MockRaftActorContext followerActorContext =
-                    new MockRaftActorContext("follower", getSystem(), followerActor);
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().behavior = follower;
 
-            Follower follower = new Follower(followerActorContext);
-            followerActor.underlyingActor().behavior = follower;
+        Map<String, String> peerAddresses = new HashMap<>();
+        peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("follower", followerActor.path().toString());
+        leaderActorContext.setPeerAddresses(peerAddresses);
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.getReplicatedLog().removeFrom(0);
 
-            leaderActorContext.getReplicatedLog().removeFrom(0);
+        //create 3 entries
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-            //create 3 entries
-            leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        leaderActorContext.setCommitIndex(1);
 
-            leaderActorContext.setCommitIndex(1);
+        followerActorContext.getReplicatedLog().removeFrom(0);
 
-            followerActorContext.getReplicatedLog().removeFrom(0);
+        // follower too has the exact same log entries and has the same commit index
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-            // follower too has the exact same log entries and has the same commit index
-            followerActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        followerActorContext.setCommitIndex(1);
 
-            followerActorContext.setCommitIndex(1);
+        leader = new Leader(leaderActorContext);
 
-            Leader leader = new Leader(leaderActorContext);
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(0, appendEntries.getEntries().size());
+        assertEquals(0, appendEntries.getPrevLogIndex());
 
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(0, appendEntries.getEntries().size());
-            assertEquals(0, appendEntries.getPrevLogIndex());
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+                leaderActor, AppendEntriesReply.class);
 
-            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
-                    leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
+        assertEquals(2, appendEntriesReply.getLogLastIndex());
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-            assertEquals(2, appendEntriesReply.getLogLastIndex());
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
+        // follower returns its next index
+        assertEquals(2, appendEntriesReply.getLogLastIndex());
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-            // follower returns its next index
-            assertEquals(2, appendEntriesReply.getLogLastIndex());
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
-        }};
+        follower.close();
     }
 
-
     @Test
     public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            TestActorRef<ForwardMessageToBehaviorActor> leaderActor = TestActorRef.create(getSystem(),
-                    Props.create(ForwardMessageToBehaviorActor.class));
+        logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
 
-            MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        MockRaftActorContext leaderActorContext = createActorContext();
 
-            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
-                    ForwardMessageToBehaviorActor.props());
+        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
-            MockRaftActorContext followerActorContext =
-                    new MockRaftActorContext("follower", getSystem(), followerActor);
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().behavior = follower;
 
-            Follower follower = new Follower(followerActorContext);
-            followerActor.underlyingActor().behavior = follower;
+        Map<String, String> peerAddresses = new HashMap<>();
+        peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("follower", followerActor.path().toString());
+        leaderActorContext.setPeerAddresses(peerAddresses);
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.getReplicatedLog().removeFrom(0);
 
-            leaderActorContext.getReplicatedLog().removeFrom(0);
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-            leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        leaderActorContext.setCommitIndex(1);
 
-            leaderActorContext.setCommitIndex(1);
+        followerActorContext.getReplicatedLog().removeFrom(0);
 
-            followerActorContext.getReplicatedLog().removeFrom(0);
+        followerActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-            followerActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        // follower has the same log entries but its commit index > leaders commit index
+        followerActorContext.setCommitIndex(2);
 
-            // follower has the same log entries but its commit index > leaders commit index
-            followerActorContext.setCommitIndex(2);
+        leader = new Leader(leaderActorContext);
 
-            Leader leader = new Leader(leaderActorContext);
+        // Initial heartbeat
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            // Initial heartbeat
-            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(0, appendEntries.getEntries().size());
+        assertEquals(0, appendEntries.getPrevLogIndex());
 
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(0, appendEntries.getEntries().size());
-            assertEquals(0, appendEntries.getPrevLogIndex());
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+                leaderActor, AppendEntriesReply.class);
 
-            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
-                    leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
+        assertEquals(2, appendEntriesReply.getLogLastIndex());
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-            assertEquals(2, appendEntriesReply.getLogLastIndex());
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
+        leaderActor.underlyingActor().behavior = leader;
+        leader.handleMessage(followerActor, appendEntriesReply);
 
-            leaderActor.underlyingActor().behavior = leader;
-            leader.handleMessage(followerActor, appendEntriesReply);
+        leaderActor.underlyingActor().clear();
+        followerActor.underlyingActor().clear();
 
-            leaderActor.underlyingActor().clear();
-            followerActor.underlyingActor().clear();
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
-            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                    TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            leader.handleMessage(leaderActor, new SendHeartBeat());
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        assertEquals(2, appendEntries.getLeaderCommit());
+        assertEquals(0, appendEntries.getEntries().size());
+        assertEquals(2, appendEntries.getPrevLogIndex());
 
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(0, appendEntries.getEntries().size());
-            assertEquals(2, appendEntries.getPrevLogIndex());
+        appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
-            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
+        assertEquals(2, appendEntriesReply.getLogLastIndex());
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
 
-            assertEquals(2, appendEntriesReply.getLogLastIndex());
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
+        assertEquals(2, followerActorContext.getCommitIndex());
 
-            assertEquals(1, followerActorContext.getCommitIndex());
-        }};
+        follower.close();
     }
 
     @Test
     public void testHandleAppendEntriesReplyFailure(){
-        new JavaTestKit(getSystem()) {
-            {
+        logStart("testHandleAppendEntriesReplyFailure");
 
-                ActorRef leaderActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
-                ActorRef followerActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        leader = new Leader(leaderActorContext);
 
+        // Send initial heartbeat reply with last index.
+        leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1));
 
-                MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals("getNextIndex", 11, followerInfo.getNextIndex());
 
-                Map<String, String> peerAddresses = new HashMap<>();
-                peerAddresses.put("follower-1",
-                    followerActor.path().toString());
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1);
 
-                leaderActorContext.setPeerAddresses(peerAddresses);
+        RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
-                Leader leader = new Leader(leaderActorContext);
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
 
-                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
-
-                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
-
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
-
-            }};
+        assertEquals("getNextIndex", 10, followerInfo.getNextIndex());
     }
 
     @Test
     public void testHandleAppendEntriesReplySuccess() throws Exception {
-        new JavaTestKit(getSystem()) {
-            {
-
-                ActorRef leaderActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
-                ActorRef followerActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
-
+        logStart("testHandleAppendEntriesReplySuccess");
 
-                MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
-                leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
 
-                Map<String, String> peerAddresses = new HashMap<>();
-                peerAddresses.put("follower-1",
-                    followerActor.path().toString());
+        leaderActorContext.setCommitIndex(1);
+        leaderActorContext.setLastApplied(1);
+        leaderActorContext.getTermInformation().update(1, "leader");
 
-                leaderActorContext.setPeerAddresses(peerAddresses);
-                leaderActorContext.setCommitIndex(1);
-                leaderActorContext.setLastApplied(1);
-                leaderActorContext.getTermInformation().update(1, "leader");
+        leader = new Leader(leaderActorContext);
 
-                Leader leader = new Leader(leaderActorContext);
+        AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1);
 
-                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+        RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
-                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
 
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
+        assertEquals(2, leaderActorContext.getCommitIndex());
 
-                assertEquals(2, leaderActorContext.getCommitIndex());
+        ApplyLogEntries applyLogEntries = MessageCollectorActor.expectFirstMatching(
+                leaderActor, ApplyLogEntries.class);
 
-                ApplyLogEntries applyLogEntries =
-                    MessageCollectorActor.getFirstMatching(leaderActor,
-                    ApplyLogEntries.class);
+        assertEquals(2, leaderActorContext.getLastApplied());
 
-                assertNotNull(applyLogEntries);
+        assertEquals(2, applyLogEntries.getToIndex());
 
-                assertEquals(2, leaderActorContext.getLastApplied());
+        List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+                ApplyState.class);
 
-                assertEquals(2, applyLogEntries.getToIndex());
+        assertEquals(1,applyStateList.size());
 
-                List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
-                    ApplyState.class);
+        ApplyState applyState = applyStateList.get(0);
 
-                assertEquals(1,applyStateList.size());
-
-                ApplyState applyState = (ApplyState) applyStateList.get(0);
-
-                assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
-
-            }};
+        assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
     }
 
     @Test
     public void testHandleAppendEntriesReplyUnknownFollower(){
-        new JavaTestKit(getSystem()) {
-            {
+        logStart("testHandleAppendEntriesReplyUnknownFollower");
 
-                ActorRef leaderActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        MockRaftActorContext leaderActorContext = createActorContext();
 
-                MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        leader = new Leader(leaderActorContext);
 
-                Leader leader = new Leader(leaderActorContext);
+        AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1);
 
-                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+        RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
 
-                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
-
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
-
-            }};
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
     }
 
     @Test
     public void testHandleRequestVoteReply(){
-        new JavaTestKit(getSystem()) {
-            {
-
-                ActorRef leaderActor =
-                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+        logStart("testHandleRequestVoteReply");
 
-                MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+        MockRaftActorContext leaderActorContext = createActorContext();
 
-                Leader leader = new Leader(leaderActorContext);
+        leader = new Leader(leaderActorContext);
 
-                RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+        // Should be a no-op.
+        RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
+                new RequestVoteReply(1, true));
 
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
 
-                raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+        raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
 
-                assertEquals(RaftState.Leader, raftActorBehavior.state());
-            }};
+        assertEquals(RaftState.Leader, raftActorBehavior.state());
     }
 
     @Test
     public void testIsolatedLeaderCheckNoFollowers() {
-        new JavaTestKit(getSystem()) {{
-            ActorRef leaderActor = getTestActor();
-
-            MockRaftActorContext leaderActorContext =
-                new MockRaftActorContext("leader", getSystem(), leaderActor);
+        logStart("testIsolatedLeaderCheckNoFollowers");
 
-            Map<String, String> peerAddresses = new HashMap<>();
-            leaderActorContext.setPeerAddresses(peerAddresses);
+        MockRaftActorContext leaderActorContext = createActorContext();
 
-            Leader leader = new Leader(leaderActorContext);
-            RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
-            Assert.assertTrue(behavior instanceof Leader);
-        }};
+        leader = new Leader(leaderActorContext);
+        RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+        Assert.assertTrue(behavior instanceof Leader);
     }
 
     @Test
     public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+        logStart("testIsolatedLeaderCheckTwoFollowers");
+
         new JavaTestKit(getSystem()) {{
 
             ActorRef followerActor1 = getTestActor();
             ActorRef followerActor2 = getTestActor();
 
-            MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+            MockRaftActorContext leaderActorContext = createActorContext();
 
             Map<String, String> peerAddresses = new HashMap<>();
             peerAddresses.put("follower-1", followerActor1.path().toString());
@@ -1256,7 +1058,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
             leaderActorContext.setPeerAddresses(peerAddresses);
 
-            Leader leader = new Leader(leaderActorContext);
+            leader = new Leader(leaderActorContext);
             leader.stopIsolatedLeaderCheckSchedule();
 
             leader.markFollowerActive("follower-1");
@@ -1289,118 +1091,83 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
             Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
                 behavior instanceof IsolatedLeader);
-
         }};
     }
 
 
     @Test
     public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
-        new JavaTestKit(getSystem()) {{
-            TestActorRef<MessageCollectorActor> leaderActor = TestActorRef.create(getSystem(),
-                    Props.create(MessageCollectorActor.class));
-
-            MockRaftActorContext leaderActorContext =
-                    new MockRaftActorContext("leader", getSystem(), leaderActor);
-
-            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
-            //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
-            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
-
-            leaderActorContext.setConfigParams(configParams);
+        logStart("testAppendEntryCallAtEndofAppendEntryReply");
 
-            TestActorRef<ForwardMessageToBehaviorActor> followerActor = TestActorRef.create(getSystem(),
-                    ForwardMessageToBehaviorActor.props());
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
 
-            MockRaftActorContext followerActorContext =
-                    new MockRaftActorContext("follower-reply", getSystem(), followerActor);
-
-            followerActorContext.setConfigParams(configParams);
-
-            Follower follower = new Follower(followerActorContext);
-            followerActor.underlyingActor().behavior = follower;
-
-            Map<String, String> peerAddresses = new HashMap<>();
-            peerAddresses.put("follower-reply",
-                    followerActor.path().toString());
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+        configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
-            leaderActorContext.setPeerAddresses(peerAddresses);
+        leaderActorContext.setConfigParams(configParams);
 
-            leaderActorContext.getReplicatedLog().removeFrom(0);
-            leaderActorContext.setCommitIndex(-1);
-            leaderActorContext.setLastApplied(-1);
+        MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
 
-            followerActorContext.getReplicatedLog().removeFrom(0);
-            followerActorContext.setCommitIndex(-1);
-            followerActorContext.setLastApplied(-1);
+        followerActorContext.setConfigParams(configParams);
 
-            Leader leader = new Leader(leaderActorContext);
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().behavior = follower;
 
-            AppendEntriesReply appendEntriesReply = MessageCollectorActor.getFirstMatching(
-                    leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
-            System.out.println("appendEntriesReply: "+appendEntriesReply);
-            leader.handleMessage(followerActor, appendEntriesReply);
+        leaderActorContext.getReplicatedLog().removeFrom(0);
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
 
-            // Clear initial heartbeat messages
+        followerActorContext.getReplicatedLog().removeFrom(0);
+        followerActorContext.setCommitIndex(-1);
+        followerActorContext.setLastApplied(-1);
 
-            leaderActor.underlyingActor().clear();
-            followerActor.underlyingActor().clear();
+        leader = new Leader(leaderActorContext);
 
-            // create 3 entries
-            leaderActorContext.setReplicatedLog(
-                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
-            leaderActorContext.setCommitIndex(1);
-            leaderActorContext.setLastApplied(1);
+        AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+                leaderActor, AppendEntriesReply.class);
 
-            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
-                    TimeUnit.MILLISECONDS);
+        leader.handleMessage(followerActor, appendEntriesReply);
 
-            leader.handleMessage(leaderActor, new SendHeartBeat());
+        // Clear initial heartbeat messages
 
-            AppendEntries appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        leaderActor.underlyingActor().clear();
+        followerActor.underlyingActor().clear();
 
-            // Should send first log entry
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(0, appendEntries.getEntries().get(0).getIndex());
-            assertEquals(-1, appendEntries.getPrevLogIndex());
+        // create 3 entries
+        leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+        leaderActorContext.setCommitIndex(1);
+        leaderActorContext.setLastApplied(1);
 
-            appendEntriesReply = MessageCollectorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
-            assertNotNull(appendEntriesReply);
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
-            assertEquals(1, appendEntriesReply.getLogLastTerm());
-            assertEquals(0, appendEntriesReply.getLogLastIndex());
+        leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            followerActor.underlyingActor().clear();
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+        // Should send first log entry
+        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(0, appendEntries.getEntries().get(0).getIndex());
+        assertEquals(-1, appendEntries.getPrevLogIndex());
 
-            appendEntries = MessageCollectorActor.getFirstMatching(followerActor, AppendEntries.class);
-            assertNotNull(appendEntries);
+        appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
-            // Should send second log entry
-            assertEquals(1, appendEntries.getLeaderCommit());
-            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
-        }};
-    }
+        assertEquals(1, appendEntriesReply.getLogLastTerm());
+        assertEquals(0, appendEntriesReply.getLogLastIndex());
 
-    class MockLeader extends Leader {
+        followerActor.underlyingActor().clear();
 
-        FollowerToSnapshot fts;
+        leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
 
-        public MockLeader(RaftActorContext context){
-            super(context);
-        }
+        appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        public FollowerToSnapshot getFollowerToSnapshot() {
-            return fts;
-        }
+        // Should send second log entry
+        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(1, appendEntries.getEntries().get(0).getIndex());
 
-        public void createFollowerToSnapshot(String followerId, ByteString bs ) {
-            fts = new FollowerToSnapshot(bs);
-            setFollowerSnapshot(followerId, fts);
-        }
+        follower.close();
     }
 
     private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
index 79c90cf051cc928ac50e563a136c62030809bbc2..448c28e8c8e9f090dacdbab184b7760f85c66893 100644 (file)
@@ -18,6 +18,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.junit.Assert;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -63,29 +64,39 @@ public class MessageCollectorActor extends UntypedActor {
      * @return
      */
     public static <T> T getFirstMatching(ActorRef actor, Class<T> clazz) throws Exception {
-        for(int i = 0; i < 50; i++) {
-            List<Object> allMessages = getAllMessages(actor);
+        List<Object> allMessages = getAllMessages(actor);
 
-            for(Object message : allMessages){
-                if(message.getClass().equals(clazz)){
-                    return (T) message;
-                }
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                return (T) message;
+            }
+        }
+
+        return null;
+    }
+
+    public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) throws Exception {
+        for(int i = 0; i < 50; i++) {
+            T message = getFirstMatching(actor, clazz);
+            if(message != null) {
+                return message;
             }
 
             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
         }
 
+        Assert.fail("Did not receive message of type " + clazz);
         return null;
     }
 
-    public static List<Object> getAllMatching(ActorRef actor, Class<?> clazz) throws Exception {
+    public static <T> List<T> getAllMatching(ActorRef actor, Class<T> clazz) throws Exception {
         List<Object> allMessages = getAllMessages(actor);
 
-        List<Object> output = Lists.newArrayList();
+        List<T> output = Lists.newArrayList();
 
         for(Object message : allMessages){
             if(message.getClass().equals(clazz)){
-                output.add(message);
+                output.add((T) message);
             }
         }