Fix issue when AE leader differs from prior install snapshot leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
index 29fb613327f23b72755514ce6f2c256d447f8a83..b01fd33914f9500c8537cc86032aee924b54f7ed 100644 (file)
@@ -1,20 +1,39 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -22,6 +41,7 @@ import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -29,9 +49,12 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
 
-public class FollowerTest extends AbstractRaftActorBehaviorTest {
+public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
     private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
@@ -39,7 +62,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
             Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
 
-    private RaftActorBehavior follower;
+    private Follower follower;
+
+    private final short payloadVersion = 5;
 
     @Override
     @After
@@ -52,8 +77,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Override
-    protected RaftActorBehavior createBehavior(RaftActorContext actorContext) {
-        return new Follower(actorContext);
+    protected Follower createBehavior(RaftActorContext actorContext) {
+        return spy(new Follower(actorContext));
     }
 
     @Override
@@ -63,7 +88,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     @Override
     protected  MockRaftActorContext createActorContext(ActorRef actorRef){
-        return new MockRaftActorContext("follower", getSystem(), actorRef);
+        MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
+        context.setPayloadVersion(payloadVersion );
+        return context;
     }
 
     @Test
@@ -71,26 +98,60 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         MockRaftActorContext actorContext = createActorContext();
         follower = new Follower(actorContext);
 
-        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class,
+        MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class,
                 actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis());
     }
 
     @Test
-    public void testHandleElectionTimeout(){
-        logStart("testHandleElectionTimeout");
+    public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() {
+        logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived");
 
-        follower = new Follower(createActorContext());
+        MockRaftActorContext context = createActorContext();
+        follower = new Follower(context);
 
-        RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout());
+        Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+        RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
 
         assertTrue(raftBehavior instanceof Candidate);
     }
 
     @Test
-    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){
+    public void testHandleElectionTimeoutWhenLeaderMessageReceived() {
+        logStart("testHandleElectionTimeoutWhenLeaderMessageReceived");
+
+        MockRaftActorContext context = createActorContext();
+        ((DefaultConfigParamsImpl) context.getConfigParams()).
+                setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4);
+
+        follower = new Follower(context);
+        context.setCurrentBehavior(follower);
+
+        Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
+                getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS);
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+                -1, -1, (short) 1));
+
+        Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS);
+        RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+        assertTrue(raftBehavior instanceof Follower);
+
+        Uninterruptibles.sleepUninterruptibly(context.getConfigParams().
+                getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS);
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(),
+                -1, -1, (short) 1));
+
+        Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE);
+        assertTrue(raftBehavior instanceof Follower);
+    }
+
+    @Test
+    public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() {
         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull");
 
-        RaftActorContext context = createActorContext();
+        MockRaftActorContext context = createActorContext();
         long term = 1000;
         context.getTermInformation().update(term, null);
 
@@ -102,13 +163,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         assertEquals("isVoteGranted", true, reply.isVoteGranted());
         assertEquals("getTerm", term, reply.getTerm());
+        verify(follower).scheduleElection(any(FiniteDuration.class));
     }
 
     @Test
     public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){
         logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId");
 
-        RaftActorContext context = createActorContext();
+        MockRaftActorContext context = createActorContext();
         long term = 1000;
         context.getTermInformation().update(term, "test");
 
@@ -119,6 +181,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class);
 
         assertEquals("isVoteGranted", false, reply.isVoteGranted());
+        verify(follower, never()).scheduleElection(any(FiniteDuration.class));
     }
 
 
@@ -127,19 +190,121 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         logStart("testHandleFirstAppendEntries");
 
         MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar"));
+        context.getReplicatedLog().setSnapshotIndex(99);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        Assert.assertEquals(1, context.getReplicatedLog().size());
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertFalse("append entries reply should be false", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar"));
+        context.getReplicatedLog().setSnapshotIndex(99);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 101, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().setSnapshotIndex(100);
 
         List<ReplicatedLogEntry> entries = Arrays.asList(
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
 
         FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
 
         assertFalse(syncStatus.isInitialSyncDone());
+        assertTrue("append entries reply should be true", reply.isSuccess());
+    }
+
+    @Test
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception {
+        logStart("testHandleFirstAppendEntries");
+
+        MockRaftActorContext context = createActorContext();
+        context.getReplicatedLog().clear(0,2);
+        context.getReplicatedLog().setSnapshotIndex(100);
+
+        List<ReplicatedLogEntry> entries = Arrays.asList(
+                newReplicatedLogEntry(2, 105, "foo"));
+
+        // The new commitIndex is 101
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0);
+
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, appendEntries);
+
+        FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertFalse(syncStatus.isInitialSyncDone());
+        assertFalse("append entries reply should be false", reply.isSuccess());
     }
 
     @Test
@@ -152,7 +317,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -173,7 +338,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -202,7 +367,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -222,7 +387,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // leader-2 is becoming the leader now and it says the commitIndex is 45
-        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -243,7 +408,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -264,7 +429,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101);
+        appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -282,7 +447,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // leader-2 is becoming the leader now and it says the commitIndex is 45
-        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100);
+        appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -316,7 +481,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100);
+        AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0);
 
         follower = createBehavior(context);
         follower.handleMessage(leaderActor, appendEntries);
@@ -342,7 +507,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         // AppendEntries is now sent with a bigger term
         // this will set the receivers term to be the same as the sender's term
-        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1);
+        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -390,7 +555,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
-        AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1);
+        short leaderPayloadVersion = 10;
+        String leaderId = "leader-1";
+        AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion);
 
         follower = createBehavior(context);
 
@@ -402,6 +569,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("Entry 3", entries.get(0), log.get(3));
         assertEquals("Entry 4", entries.get(1), log.get(4));
 
+        assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion());
+        assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId());
+
         expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4);
     }
 
@@ -437,7 +607,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         // before the new behavior was created (1 in this case)
         // This will not work for a Candidate because as soon as a Candidate
         // is created it increments the term
-        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -462,6 +632,44 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
     }
 
+    @Test
+    public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
+        logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+
+        // First set the receivers term to lower number
+        context.getTermInformation().update(1, "test");
+
+        // Prepare the receivers log
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+        log.append(newReplicatedLogEntry(1, 0, "zero"));
+        log.append(newReplicatedLogEntry(1, 1, "one"));
+        log.append(newReplicatedLogEntry(1, 2, "two"));
+
+        context.setReplicatedLog(log);
+
+        // Prepare the entries to be sent with AppendEntries
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(newReplicatedLogEntry(2, 2, "two-1"));
+        entries.add(newReplicatedLogEntry(2, 3, "three"));
+
+        // Send appendEntries with the same term as was set on the receiver
+        // before the new behavior was created (1 in this case)
+        // This will not work for a Candidate because as soon as a Candidate
+        // is created it increments the term
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
+
+        context.setRaftPolicy(createRaftPolicy(false, true));
+        follower = createBehavior(context);
+
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+        Assert.assertSame(follower, newBehavior);
+
+        expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
+    }
+
     @Test
     public void testHandleAppendEntriesPreviousLogEntryMissing(){
         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
@@ -480,7 +688,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1);
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -511,7 +719,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         follower = createBehavior(context);
 
-        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1));
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0));
 
         assertEquals("Next index", 2, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
@@ -523,7 +731,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
 
         leaderActor.underlyingActor().clear();
-        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1));
+        follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
 
         assertEquals("Next index", 3, log.last().getIndex() + 1);
         assertEquals("Entry 1", entries.get(0), log.get(1));
@@ -551,7 +759,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         List<ReplicatedLogEntry> entries = new ArrayList<>();
         entries.add(newReplicatedLogEntry(1, 4, "four"));
 
-        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3);
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0);
 
         follower = createBehavior(context);
 
@@ -574,25 +782,21 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         logStart("testHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
+        context.getTermInformation().update(1, "leader");
 
         follower = createBehavior(context);
 
-        HashMap<String, String> followerSnapshot = new HashMap<>();
-        followerSnapshot.put("1", "A");
-        followerSnapshot.put("2", "B");
-        followerSnapshot.put("3", "C");
-
-        ByteString bsSnapshot  = toByteString(followerSnapshot);
+        ByteString bsSnapshot  = createSnapshot();
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
-        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
         int lastIncludedIndex = 1;
         int chunkIndex = 1;
         InstallSnapshot lastInstallSnapshot = null;
 
         for(int i = 0; i < totalChunks; i++) {
-            ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+            byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
                     chunkData, chunkIndex, totalChunks);
             follower.handleMessage(leaderActor, lastInstallSnapshot);
@@ -604,6 +808,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor,
                 ApplySnapshot.class);
         Snapshot snapshot = applySnapshot.getSnapshot();
+        assertNotNull(lastInstallSnapshot);
         assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex());
         assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(),
                 snapshot.getLastAppliedTerm());
@@ -611,6 +816,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 snapshot.getLastAppliedIndex());
         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
         Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+        assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
+        applySnapshot.getCallback().onSuccess();
 
         List<InstallSnapshotReply> replies = MessageCollectorActor.getAllMatching(
                 leaderActor, InstallSnapshotReply.class);
@@ -624,7 +832,100 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
             assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
         }
 
-        assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+        assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
+    }
+
+
+    /**
+     * Verify that when an AppendEntries is sent to a follower during a snapshot install
+     * the Follower short-circuits the processing of the AppendEntries message.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
+        logStart("testReceivingAppendEntriesDuringInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+
+        follower = createBehavior(context);
+
+        ByteString bsSnapshot  = createSnapshot();
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+
+        // Check that snapshot installation is not in progress
+        assertNull(follower.getSnapshotTracker());
+
+        // Make sure that we have more than 1 chunk to send
+        assertTrue(totalChunks > 1);
+
+        // Send an install snapshot with the first chunk to start the process of installing a snapshot
+        byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+        follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                chunkData, 1, totalChunks));
+
+        // Check if snapshot installation is in progress now
+        assertNotNull(follower.getSnapshotTracker());
+
+        // Send an append entry
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
+                Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
+
+        follower.handleMessage(leaderActor, appendEntries);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+        assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+        assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+        assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+        assertNotNull(follower.getSnapshotTracker());
+    }
+
+    @Test
+    public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
+        logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
+
+        MockRaftActorContext context = createActorContext();
+
+        follower = createBehavior(context);
+
+        ByteString bsSnapshot  = createSnapshot();
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
+
+        // Check that snapshot installation is not in progress
+        assertNull(follower.getSnapshotTracker());
+
+        // Make sure that we have more than 1 chunk to send
+        assertTrue(totalChunks > 1);
+
+        // Send an install snapshot with the first chunk to start the process of installing a snapshot
+        byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+        follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                chunkData, 1, totalChunks));
+
+        // Check if snapshot installation is in progress now
+        assertNotNull(follower.getSnapshotTracker());
+
+        // Send appendEntries with a new term and leader.
+        AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
+                Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
+
+        follower.handleMessage(leaderActor, appendEntries);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+        assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
+        assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
+        assertEquals("getTerm", 2, reply.getTerm());
+
+        assertNull(follower.getSnapshotTracker());
     }
 
     @Test
@@ -632,25 +933,21 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         logStart("testInitialSyncUpWithHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
+        context.setCommitIndex(-1);
 
         follower = createBehavior(context);
 
-        HashMap<String, String> followerSnapshot = new HashMap<>();
-        followerSnapshot.put("1", "A");
-        followerSnapshot.put("2", "B");
-        followerSnapshot.put("3", "C");
-
-        ByteString bsSnapshot  = toByteString(followerSnapshot);
+        ByteString bsSnapshot  = createSnapshot();
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
-        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
         int lastIncludedIndex = 1;
         int chunkIndex = 1;
         InstallSnapshot lastInstallSnapshot = null;
 
         for(int i = 0; i < totalChunks; i++) {
-            ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
+            byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize);
             lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
                     chunkData, chunkIndex, totalChunks);
             follower.handleMessage(leaderActor, lastInstallSnapshot);
@@ -676,7 +973,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
                 newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
-        AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0);
         follower.handleMessage(leaderActor, appendEntries);
 
         syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
@@ -692,12 +989,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
         follower = createBehavior(context);
 
-        HashMap<String, String> followerSnapshot = new HashMap<>();
-        followerSnapshot.put("1", "A");
-        followerSnapshot.put("2", "B");
-        followerSnapshot.put("3", "C");
-
-        ByteString bsSnapshot = toByteString(followerSnapshot);
+        ByteString bsSnapshot = createSnapshot();
 
         InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1,
                 getNextChunk(bsSnapshot, 10, 50), 3, 3);
@@ -711,25 +1003,111 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getTerm", 1, reply.getTerm());
         assertEquals("getFollowerId", context.getId(), reply.getFollowerId());
 
-        assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker());
+        assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
+    }
+
+    @Test
+    public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){
+        MockRaftActorContext context = createActorContext();
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+
+        follower = createBehavior(context);
+
+        TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+
+        long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+
+        assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis());
+
+        RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+        assertTrue("Expected Candidate", newBehavior instanceof Candidate);
+    }
+
+    @Test
+    public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){
+        MockRaftActorContext context = createActorContext();
+        context.setConfigParams(new DefaultConfigParamsImpl(){
+            @Override
+            public FiniteDuration getElectionTimeOutInterval() {
+                return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+            }
+        });
+
+        context.setRaftPolicy(createRaftPolicy(false, false));
+
+        follower = createBehavior(context);
+
+        TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class);
+        RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow);
+        assertSame("handleMessage result", follower, newBehavior);
     }
 
-    public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){
+    @Test
+    public void testFollowerSchedulesElectionIfNonVoting(){
+        MockRaftActorContext context = createActorContext();
+        context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false))));
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval(
+                FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1);
+
+        follower = new Follower(context, "leader", (short)1);
+
+        ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor,
+                ElectionTimeout.class);
+        RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout);
+        assertSame("handleMessage result", follower, newBehavior);
+        assertNull("Expected null leaderId", follower.getLeaderId());
+    }
+
+    @Test
+    public void testElectionScheduledWhenAnyRaftRPCReceived(){
+        MockRaftActorContext context = createActorContext();
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, new RaftRPC() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public long getTerm() {
+                return 100;
+            }
+        });
+        verify(follower).scheduleElection(any(FiniteDuration.class));
+    }
+
+    @Test
+    public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){
+        MockRaftActorContext context = createActorContext();
+        follower = createBehavior(context);
+        follower.handleMessage(leaderActor, "non-raft-rpc");
+        verify(follower, never()).scheduleElection(any(FiniteDuration.class));
+    }
+
+    public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){
         int snapshotLength = bs.size();
         int start = offset;
         int size = chunkSize;
         if (chunkSize > snapshotLength) {
             size = snapshotLength;
         } else {
-            if ((start + chunkSize) > snapshotLength) {
+            if (start + chunkSize > snapshotLength) {
                 size = snapshotLength - start;
             }
         }
-        return bs.substring(start, start + size);
+
+        byte[] nextChunk = new byte[size];
+        bs.copyTo(nextChunk, start, 0, size);
+        return nextChunk;
     }
 
     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
+        expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
+    }
+
+    private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
+                                                   String expFollowerId, long expLogLastTerm, long expLogLastIndex,
+                                                   boolean expForceInstallSnapshot) {
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
                 AppendEntriesReply.class);
@@ -739,24 +1117,36 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getFollowerId", expFollowerId, reply.getFollowerId());
         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
+        assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
+        assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
     }
 
-    private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
+
+    private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
                 new MockRaftActorContext.MockPayload(data));
     }
 
+    private ByteString createSnapshot(){
+        HashMap<String, String> followerSnapshot = new HashMap<>();
+        followerSnapshot.put("1", "A");
+        followerSnapshot.put("2", "B");
+        followerSnapshot.put("3", "C");
+
+        return toByteString(followerSnapshot);
+    }
+
     @Override
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {
         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
 
-        String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null;
+        String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
         assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor());
     }
 
     @Override
-    protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
+    protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
             throws Exception {
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
         assertEquals("isSuccess", true, reply.isSuccess());