Bug 6540: Move LeaderInstallSnapshotState to FollowerLogInformation
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index b51f0a70b1dc7ef54da2f837b55354fdffaf02af..9e91e514d7efcd62579dbf38ab3e7433cb93541a 100644 (file)
@@ -9,7 +9,10 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -23,13 +26,13 @@ import akka.testkit.TestActorRef;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -37,33 +40,33 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
-import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
 import scala.concurrent.duration.FiniteDuration;
 
-public class LeaderTest extends AbstractLeaderTest {
+public class LeaderTest extends AbstractLeaderTest<Leader> {
 
     static final String FOLLOWER_ID = "follower";
     public static final String LEADER_ID = "leader";
@@ -93,10 +96,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(createActorContext());
 
-        // handle message should return the Leader state when it receives an
-        // unknown message
-        RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
-        Assert.assertTrue(behavior instanceof Leader);
+        // handle message should null when it receives an unknown message
+        assertNull(leader.handleMessage(followerActor, "foo"));
     }
 
     @Test
@@ -104,6 +105,7 @@ public class LeaderTest extends AbstractLeaderTest {
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
         short payloadVersion = (short)5;
         actorContext.setPayloadVersion(payloadVersion);
 
@@ -111,6 +113,7 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(term, "");
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         // Leader should send an immediate heartbeat with no entries as follower is inactive.
         long lastIndex = actorContext.getReplicatedLog().lastIndex();
@@ -132,7 +135,7 @@ public class LeaderTest extends AbstractLeaderTest {
         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
                 getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
@@ -145,9 +148,13 @@ public class LeaderTest extends AbstractLeaderTest {
 
 
     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+        return sendReplicate(actorContext, 1, index);
+    }
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                1, index, payload);
+                term, index, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
     }
@@ -189,6 +196,60 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
     }
 
+    @Test
+    public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+        logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
+
+        // The raft context is initialized with a couple log entries. However the commitIndex
+        // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
+        // committed and applied. Now it regains leadership with a higher term (2).
+        long prevTerm = actorContext.getTermInformation().getCurrentTerm();
+        long newTerm = prevTerm + 1;
+        actorContext.getTermInformation().update(newTerm, "");
+
+        leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower replies with the leader's current last index and term, simulating that it is
+        // up to date with the leader.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
+
+        // The commit index should not get updated even though consensus was reached. This is b/c the
+        // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
+        // from previous terms by counting replicas".
+        assertEquals("Commit Index", -1, actorContext.getCommitIndex());
+
+        followerActor.underlyingActor().clear();
+
+        // Now replicate a new entry with the new term 2.
+        long newIndex = lastIndex + 1;
+        sendReplicate(actorContext, newTerm, newIndex);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+
+        // The follower replies with success. The leader should now update the commit index to the new index
+        // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
+        // prior entries are committed indirectly".
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
+
+        assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
+    }
+
     @Test
     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
@@ -350,7 +411,7 @@ public class LeaderTest extends AbstractLeaderTest {
         // Wait slightly longer than heartbeat duration
         Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
         assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
@@ -392,7 +453,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         for(int i=0;i<3;i++) {
             Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
-            leader.handleMessage(leaderActor, new SendHeartBeat());
+            leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         }
 
         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
@@ -428,7 +489,7 @@ public class LeaderTest extends AbstractLeaderTest {
         followerActor.underlyingActor().clear();
 
         Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         sendReplicate(actorContext, lastIndex+1);
 
         List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
@@ -456,8 +517,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         actorContext.getReplicatedLog().append(newEntry);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new Replicate(leaderActor, "state-id", newEntry));
+        final Identifier id = new MockIdentifier("state-id");
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry));
 
         // State should not change
         assertTrue(raftBehavior instanceof Leader);
@@ -478,7 +539,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         ApplyState last = applyStateList.get((int) newLogIndex - 1);
         assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
-        assertEquals("getIdentifier", "state-id", last.getIdentifier());
+        assertEquals("getIdentifier", id, last.getIdentifier());
     }
 
     @Test
@@ -524,8 +585,9 @@ public class LeaderTest extends AbstractLeaderTest {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
 
         //send first chunk and no InstallSnapshotReply received yet
         fts.getNextChunk();
@@ -534,18 +596,16 @@ public class LeaderTest extends AbstractLeaderTest {
         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
                 TimeUnit.MILLISECONDS);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
-
-        AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
-        AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+        AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
 
         //InstallSnapshotReply received
         fts.markSendStatus(true);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
@@ -594,7 +654,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
         RaftActorBehavior raftBehavior = leader.handleMessage(
-                leaderActor, new Replicate(null, "state-id", entry));
+                leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -639,7 +699,7 @@ public class LeaderTest extends AbstractLeaderTest {
         //update follower timestamp
         leader.markFollowerActive(FOLLOWER_ID);
 
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -652,9 +712,9 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(2, cs.getLastTerm());
 
         // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
 
-        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
     @Test
@@ -713,10 +773,10 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(4, cs.getLastIndex());
         assertEquals(2, cs.getLastTerm());
 
-        // if an initiate is started again when first is in progress, it shouldnt initiate Capture
-        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+        // if an initiate is started again when first is in progress, it should not initiate Capture
+        leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry));
 
-        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
 
@@ -837,6 +897,7 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
@@ -858,8 +919,9 @@ public class LeaderTest extends AbstractLeaderTest {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
         while(!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
             fts.incrementChunkIndex();
@@ -873,12 +935,13 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertTrue(raftBehavior instanceof Leader);
 
-        assertEquals(0, leader.followerSnapshotSize());
         assertEquals(1, leader.followerLogSize());
         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
         assertNotNull(fli);
+        assertNull(fli.getInstallSnapshotState());
         assertEquals(commitIndex, fli.getMatchIndex());
         assertEquals(commitIndex + 1, fli.getNextIndex());
+        assertFalse(leader.hasSnapshot());
     }
 
     @Test
@@ -905,6 +968,7 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
         leader.getFollower(FOLLOWER_ID).setNextIndex(0);
@@ -953,7 +1017,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
 
-        Assert.assertNull(installSnapshot);
+        assertNull(installSnapshot);
     }
 
 
@@ -1013,7 +1077,7 @@ public class LeaderTest extends AbstractLeaderTest {
         Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
                 TimeUnit.MILLISECONDS);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
         installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
@@ -1067,9 +1131,10 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
+        assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
+                installSnapshot.getLastChunkHashCode().get().intValue());
 
-        int hashCode = installSnapshot.getData().hashCode();
+        int hashCode = Arrays.hashCode(installSnapshot.getData());
 
         followerActor.underlyingActor().clear();
 
@@ -1084,19 +1149,8 @@ public class LeaderTest extends AbstractLeaderTest {
     }
 
     @Test
-    public void testFollowerToSnapshotLogic() {
-        logStart("testFollowerToSnapshotLogic");
-
-        MockRaftActorContext actorContext = createActorContext();
-
-        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
-            @Override
-            public int getSnapshotChunkSize() {
-                return 50;
-            }
-        });
-
-        leader = new Leader(actorContext);
+    public void testLeaderInstallSnapshotState() {
+        logStart("testLeaderInstallSnapshotState");
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -1106,8 +1160,7 @@ public class LeaderTest extends AbstractLeaderTest {
         ByteString bs = toByteString(leadersSnapshot);
         byte[] barray = bs.toByteArray();
 
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, 50, "test");
 
         assertEquals(bs.size(), barray.length);
 
@@ -1120,8 +1173,8 @@ public class LeaderTest extends AbstractLeaderTest {
                 j = barray.length;
             }
 
-            ByteString chunk = fts.getNextChunk();
-            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+            byte[] chunk = fts.getNextChunk();
+            assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length);
             assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
 
             fts.markSendStatus(true);
@@ -1133,8 +1186,8 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
     }
 
-    @Override protected RaftActorBehavior createBehavior(
-        RaftActorContext actorContext) {
+    @Override
+    protected Leader createBehavior(final RaftActorContext actorContext) {
         return new Leader(actorContext);
     }
 
@@ -1184,6 +1237,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         Map<String, String> peerAddresses = new HashMap<>();
         peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
@@ -1207,6 +1261,7 @@ public class LeaderTest extends AbstractLeaderTest {
         followerActorContext.setCommitIndex(1);
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
@@ -1238,6 +1293,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         Map<String, String> leaderPeerAddresses = new HashMap<>();
         leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
@@ -1283,7 +1339,7 @@ public class LeaderTest extends AbstractLeaderTest {
         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
                 TimeUnit.MILLISECONDS);
 
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
@@ -1349,7 +1405,7 @@ public class LeaderTest extends AbstractLeaderTest {
         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
@@ -1405,6 +1461,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         leader = new Leader(leaderActorContext);
 
@@ -1420,6 +1477,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
         leaderActor.underlyingActor().setBehavior(leader);
+        leaderActorContext.setCurrentBehavior(leader);
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
@@ -1484,6 +1542,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         leader = new Leader(leaderActorContext);
 
@@ -1499,6 +1558,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
         leaderActor.underlyingActor().setBehavior(leader);
+        leaderActorContext.setCurrentBehavior(leader);
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
@@ -1602,7 +1662,10 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+
         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+        assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
 
         short payloadVersion = 5;
         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
@@ -1629,8 +1692,10 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
 
-        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals(2, followerInfo.getMatchIndex());
+        assertEquals(3, followerInfo.getNextIndex());
         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
+        assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
     }
 
     @Test
@@ -1676,6 +1741,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Follower follower = new Follower(followerActorContext);
         followerActor.underlyingActor().setBehavior(follower);
+        followerActorContext.setCurrentBehavior(follower);
 
         leader = new Leader(leaderActorContext);
 
@@ -1691,6 +1757,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
 
         leaderActor.underlyingActor().setBehavior(leader);
+        leaderActorContext.setCurrentBehavior(leader);
 
         leader.handleMessage(followerActor, appendEntriesReply);
 
@@ -1711,7 +1778,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         appendEntries = appendEntriesList.get(1);
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
@@ -1756,8 +1823,27 @@ public class LeaderTest extends AbstractLeaderTest {
         MockRaftActorContext leaderActorContext = createActorContext();
 
         leader = new Leader(leaderActorContext);
-        RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
-        Assert.assertTrue(behavior instanceof Leader);
+        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue(behavior instanceof Leader);
+    }
+
+    @Test
+    public void testIsolatedLeaderCheckNoVotingFollowers() {
+        logStart("testIsolatedLeaderCheckNoVotingFollowers");
+
+        MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+        Follower follower = new Follower(followerActorContext);
+        followerActor.underlyingActor().setBehavior(follower);
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING);
+
+        leader = new Leader(leaderActorContext);
+        leader.getFollower(FOLLOWER_ID).markFollowerActive();
+        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Expected Leader", behavior instanceof Leader);
     }
 
     private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
@@ -1777,9 +1863,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.markFollowerActive("follower-1");
         leader.markFollowerActive("follower-2");
-        RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
-        Assert.assertTrue("Behavior not instance of Leader when all followers are active",
-                behavior instanceof Leader);
+        RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader);
 
         // kill 1 follower and verify if that got killed
         final JavaTestKit probe = new JavaTestKit(getSystem());
@@ -1790,9 +1875,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader.markFollowerInActive("follower-1");
         leader.markFollowerActive("follower-2");
-        behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
-        Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
-                behavior instanceof Leader);
+        behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
+        assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader);
 
         // kill 2nd follower and leader should change to Isolated leader
         followerActor2.tell(PoisonPill.getInstance(), null);
@@ -1802,7 +1886,7 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(termMsg2.getActor(), followerActor2);
 
         leader.markFollowerInActive("follower-2");
-        return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+        return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK);
     }
 
     @Test
@@ -1811,7 +1895,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
 
-        Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+        assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
             behavior instanceof IsolatedLeader);
     }
 
@@ -1821,7 +1905,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
 
-        Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
+        assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
                 behavior instanceof Leader);
     }
 
@@ -1894,6 +1978,7 @@ public class LeaderTest extends AbstractLeaderTest {
                 new FiniteDuration(1000, TimeUnit.SECONDS));
 
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
 
         String nonVotingFollowerId = "nonvoting-follower";
         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
@@ -1902,6 +1987,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Ignore initial heartbeats
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -1943,7 +2029,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
 
         // Send reply from the voting follower and verify consensus.
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
 
         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
     }
@@ -1958,6 +2044,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Initial heartbeat
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -1982,7 +2069,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
 
         // Leader should force an election timeout
-        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+        MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
 
         verify(mockTransferCohort).transferComplete();
     }
@@ -1997,6 +2084,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Initial heartbeat
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -2014,7 +2102,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // Leader should force an election timeout
-        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+        MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
 
         verify(mockTransferCohort).transferComplete();
     }
@@ -2028,6 +2116,7 @@ public class LeaderTest extends AbstractLeaderTest {
                 new FiniteDuration(200, TimeUnit.MILLISECONDS));
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Initial heartbeat
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -2045,12 +2134,12 @@ public class LeaderTest extends AbstractLeaderTest {
 
         Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
                 getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
-        leader.handleMessage(leaderActor, new SendHeartBeat());
+        leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
         leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
 
         // Leader should force an election timeout
-        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+        MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
 
         verify(mockTransferCohort).transferComplete();
     }
@@ -2066,6 +2155,7 @@ public class LeaderTest extends AbstractLeaderTest {
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
 
         leader = new Leader(leaderActorContext);
+        leaderActorContext.setCurrentBehavior(leader);
 
         // Initial heartbeat
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -2086,7 +2176,7 @@ public class LeaderTest extends AbstractLeaderTest {
         for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
             Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
                     getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
-            leader.handleMessage(leaderActor, new SendHeartBeat());
+            leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
         }
 
         verify(mockTransferCohort).abortTransfer();
@@ -2095,7 +2185,7 @@ public class LeaderTest extends AbstractLeaderTest {
     }
 
     @Override
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
         assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());