Make private methods static
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
index 4e8e7fe11bad4fdeb085ffc1ac60450cbbfeda11..1b15ecb135a89f87c212ecf3a080cf8181ba921e 100644 (file)
@@ -1,24 +1,43 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.ByteString;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -27,6 +46,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
 
 public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
@@ -38,6 +58,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     private RaftActorBehavior follower;
 
+    private final short payloadVersion = 5;
+
     @Override
     @After
     public void tearDown() throws Exception {
@@ -50,7 +72,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     @Override
     protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
-        return new Follower(actorContext);
+        return new TestFollower(actorContext);
     }
 
     @Override
@@ -60,7 +82,16 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     @Override
     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
-        return new MockRaftActorContext("follower", getSystem(), actorRef);
+        MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
+        context.setPayloadVersion(payloadVersion );
+        return context;
+    }
+
+    private static int getElectionTimeoutCount(RaftActorBehavior follower){
+        if(follower instanceof TestFollower){
+            return ((TestFollower) follower).getElectionTimeoutCount();
+        }
+        return -1;
     }
 
     @Test
@@ -99,6 +130,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         assertEquals("isVoteGranted", true, reply.isVoteGranted());
         assertEquals("getTerm", term, reply.getTerm());
+        assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
     }
 
     @Test
@@ -116,8 +148,283 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
 
         assertEquals("isVoteGranted", false, reply.isVoteGranted());
+        assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
+    }
+
+
+    @Test
+    public void testHandleFirstAppendEntries() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
+        context.getReplicatedLog().setSnapshotIndex(99);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        Assert.assertEquals(1, context.getReplicatedLog().size());
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertFalse("append entries reply should be false", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
+        context.getReplicatedLog().setSnapshotIndex(99);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().setSnapshotIndex(100);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().setSnapshotIndex(100);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 105, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertFalse("append entries reply should be false", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleSyncUpAppendEntries() throws Exception {
+        logStart("testHandleSyncUpAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(101);
+        context.setCommitIndex(101);
+        setLastLogEntry(context, 1, 101,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertTrue(syncStatus.isInitialSyncDone());
+
+        followerActor.underlyingActor().clear();
+
+        // Sending the same message again should not generate another message
+
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertNull(syncStatus);
+
     }
 
+    @Test
+    public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
+        logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(100);
+        setLastLogEntry(context, 1, 100,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // leader-2 is becoming the leader now and it says the commitIndex is 45
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        // We get a new message saying initial status is not done
+        assertFalse(syncStatus.isInitialSyncDone());
+
+    }
+
+
+    @Test
+    public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
+        logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(101);
+        context.setCommitIndex(101);
+        setLastLogEntry(context, 1, 101,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertTrue(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(100);
+        setLastLogEntry(context, 1, 100,
+                new MockRaftActorContext.MockPayload(""));
+
+        entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // leader-2 is becoming the leader now and it says the commitIndex is 45
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        // We get a new message saying initial status is not done
+        assertFalse(syncStatus.isInitialSyncDone());
+
+    }
+
+
     /**
      * This test verifies that when an AppendEntries RPC is received by a RaftActor
      * with a commitIndex that is greater than what has been applied to the
@@ -141,7 +448,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -167,7 +474,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         // AppendEntries is now sent with a bigger term
         // this will set the receivers term to be the same as the sender's term
-        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
+        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -215,7 +522,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
-        AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
+        short leaderPayloadVersion = 10;
+        String leaderId = "leader-1";
+        AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
 
         follower = createBehavior(context);
 
@@ -227,6 +536,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("Entry 3", entries.get(0), log.get(3));
         assertEquals("Entry 4", entries.get(1), log.get(4));
 
+        assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
+        assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
+
         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
     }
 
@@ -262,7 +574,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
-        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -287,6 +599,44 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
     }
 
+    @Test
+    public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
+        logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+
+        // First set the receivers term to lower number
+        context.getTermInformation().update(1, "test");
+
+        // Prepare the receivers log
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+        log.append(newReplicatedLogEntry(1, 0, "zero"));
+        log.append(newReplicatedLogEntry(1, 1, "one"));
+        log.append(newReplicatedLogEntry(1, 2, "two"));
+
+        context.setReplicatedLog(log);
+
+        // Prepare the entries to be sent with AppendEntries
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(newReplicatedLogEntry(2, 2, "two-1"));
+        entries.add(newReplicatedLogEntry(2, 3, "three"));
+
+        // Send appendEntries with the same term as was set on the receiver
+        // before the new behavior was created (1 in this case)
+        // This will not work for a Candidate because as soon as a Candidate
+        // is created it increments the term
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
+
+        context.setRaftPolicy(createRaftPolicy(false, true));
+        follower = createBehavior(context);
+
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+        Assert.assertSame(follower, newBehavior);
+
+        expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
+    }
+
     @Test
     public void testHandleAppendEntriesPreviousLogEntryMissing(){
         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
@@ -305,7 +655,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -336,7 +686,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         follower = createBehavior(context);
 
-        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
 
         assertEquals("Next index", 2, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
@@ -348,7 +698,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
 
         leaderActor.underlyingActor().clear();
-        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
 
         assertEquals("Next index", 3, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
@@ -358,7 +708,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleAppendAfterInstallingSnapshot(){
+    public void testHandleAppendEntriesAfterInstallingSnapshot(){
         logStart("testHandleAppendAfterInstallingSnapshot");
 
         MockRaftActorContext context = createActorContext();
@@ -376,7 +726,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
 
         follower = createBehavior(context);
 
@@ -399,15 +749,11 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         logStart("testHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
+        context.getTermInformation().update(1, "leader");
 
         follower = createBehavior(context);
 
-        HashMap<String, String> followerSnapshot = new HashMap<>();
-        followerSnapshot.put("1", "A");
-        followerSnapshot.put("2", "B");
-        followerSnapshot.put("3", "C");
-
-        ByteString bsSnapshot  = toByteString(followerSnapshot);
+        ByteString bsSnapshot  = createSnapshot();
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
@@ -429,6 +775,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 ApplySnapshot.class);
         Snapshot snapshot = applySnapshot.getSnapshot();
+        assertNotNull(lastInstallSnapshot);
         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
                 snapshot.getLastAppliedTerm());
@@ -436,6 +783,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 snapshot.getLastAppliedIndex());
         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+        assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
+        applySnapshot.getCallback().onSuccess();
 
         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
                 leaderActor, InstallSnapshotReply.class);
@@ -449,7 +799,110 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
         }
 
-        Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker());
+        assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+    }
+
+
+    /**
+     * Verify that when an AppendEntries is sent to a follower during a snapshot install
+     * the Follower short-circuits the processing of the AppendEntries message.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
+        logStart("testReceivingAppendEntriesDuringInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+
+        follower = createBehavior(context);
+
+        ByteString bsSnapshot  = createSnapshot();
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+
+        // Check that snapshot installation is not in progress
+        assertNull(((Follower) follower).getSnapshotTracker());
+
+        // Make sure that we have more than 1 chunk to send
+        assertTrue(totalChunks > 1);
+
+        // Send an install snapshot with the first chunk to start the process of installing a snapshot
+        ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+        follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                chunkData, 1, totalChunks));
+
+        // Check if snapshot installation is in progress now
+        assertNotNull(((Follower) follower).getSnapshotTracker());
+
+        // Send an append entry
+        AppendEntries appendEntries = mock(AppendEntries.class);
+        doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
+
+        follower.handleMessage(leaderActor, appendEntries);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+        assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+        assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+        // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
+        verify(appendEntries, never()).getPrevLogIndex();
+
+    }
+
+    @Test
+    public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
+        logStart("testInitialSyncUpWithHandleInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+
+        follower = createBehavior(context);
+
+        ByteString bsSnapshot  = createSnapshot();
+        int offset = 0;
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+        int chunkIndex = 1;
+        InstallSnapshot lastInstallSnapshot = null;
+
+        for(int i = 0; i < totalChunks; i++) {
+            ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+            lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                    chunkData, chunkIndex, totalChunks);
+            follower.handleMessage(leaderActor, lastInstallSnapshot);
+            offset = offset + 50;
+            lastIncludedIndex++;
+            chunkIndex++;
+        }
+
+        FollowerInitialSyncUpStatus syncStatus =
+                MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+
+        // Clear all the messages
+        followerActor.underlyingActor().clear();
+
+        context.setLastApplied(101);
+        context.setCommitIndex(101);
+        setLastLogEntry(context, 1, 101,
+                new MockRaftActorContext.MockPayload(""));
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+
+        assertTrue(syncStatus.isInitialSyncDone());
     }
 
     @Test
@@ -460,12 +913,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         follower = createBehavior(context);
 
-        HashMap<String, String> followerSnapshot = new HashMap<>();
-        followerSnapshot.put("1", "A");
-        followerSnapshot.put("2", "B");
-        followerSnapshot.put("3", "C");
-
-        ByteString bsSnapshot = toByteString(followerSnapshot);
+        ByteString bsSnapshot = createSnapshot();
 
         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
@@ -479,7 +927,60 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getTerm", 1, reply.getTerm());
         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
 
-        Assert.assertNull("Expected null SnapshotTracker", ((Follower)follower).getSnapshotTracker());
+        assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+    }
+
+    @Test
+    public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+        MockRaftActorContext context = createActorContext();
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+
+        follower = createBehavior(context);
+
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+        assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+    }
+
+    @Test
+    public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){
+        MockRaftActorContext context = createActorContext();
+        context.setConfigParams(new DefaultConfigParamsImpl(){
+            @Override
+            public FiniteDuration getElectionTimeOutInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        context.setRaftPolicy(createRaftPolicy(false, false));
+
+        follower = createBehavior(context);
+
+        MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500);
+    }
+
+    @Test
+    public void testElectionScheduledWhenAnyRaftRPCReceived(){
+        MockRaftActorContext context = createActorContext();
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, new RaftRPC() {
+            @Override
+            public long getTerm() {
+                return 100;
+            }
+        });
+        assertEquals("schedule election", 1, getElectionTimeoutCount(follower));
+    }
+
+    @Test
+    public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
+        MockRaftActorContext context = createActorContext();
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, "non-raft-rpc");
+        assertEquals("schedule election", 0, getElectionTimeoutCount(follower));
     }
 
     public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
@@ -498,6 +999,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
+        expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
+    }
+
+    private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
+                                                   String expFollowerId, long expLogLastTerm, long expLogLastIndex,
+                                                   boolean expForceInstallSnapshot) {
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
                 AppendEntriesReply.class);
@@ -507,13 +1014,25 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
+        assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
+        assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
     }
 
-    private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
+
+    private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
                 new MockRaftActorContext.MockPayload(data));
     }
 
+    private ByteString createSnapshot(){
+        HashMap<String, String> followerSnapshot = new HashMap<>();
+        followerSnapshot.put("1", "A");
+        followerSnapshot.put("2", "B");
+        followerSnapshot.put("3", "C");
+
+        return toByteString(followerSnapshot);
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
@@ -529,4 +1048,23 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
         assertEquals("isSuccess", true, reply.isSuccess());
     }
+
+    private static class TestFollower extends Follower {
+
+        int electionTimeoutCount = 0;
+
+        public TestFollower(RaftActorContext context) {
+            super(context);
+        }
+
+        @Override
+        protected void scheduleElection(FiniteDuration interval) {
+            electionTimeoutCount++;
+            super.scheduleElection(interval);
+        }
+
+        public int getElectionTimeoutCount() {
+            return electionTimeoutCount;
+        }
+    }
 }