Bug 3570: Use SnapShot lastAppliedIndex for install snapshot 68/22368/1
authorTom Pantelis <tpanteli@brocade.com>
Wed, 10 Jun 2015 06:17:21 +0000 (02:17 -0400)
committerMoiz Raja <moraja@cisco.com>
Thu, 11 Jun 2015 14:48:56 +0000 (14:48 +0000)
Follow-up patch for https://git.opendaylight.org/gerrit/#/c/21904/ to
use the captured Snapshot's lastAppliedIndex for the lastIncludedIndex
field in the InstallSnapshot message and the follower's matchIndex/nextIndex
once the install completes. This is in lieu of using the leader's snapshotIndex
which typically lags behind the lastAppliedIndex by 1 due to the trimming of
the in-memory log. This avoids the leader sending its last log entry
redundantly after the install completes as the last entry was included
in the snapshot.

Change-Id: Ie821078b4316641b67e1b853b9264353dde6bfae
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 1dfb0b9105e9eb352ff2263434e79a5433e59e91)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SendInstallSnapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/SnapshotManagerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index f1881f5b0f228a62ee4e6e45a03d07c4b0a5e368..77875a567ed45e2552f56b23e5bac2a70a35d47d 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
 import java.util.List;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -281,14 +280,14 @@ public class SnapshotManager implements SnapshotState {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-            Snapshot sn = Snapshot.create(snapshotBytes,
+            Snapshot snapshot = Snapshot.create(snapshotBytes,
                     captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
-            context.getPersistenceProvider().saveSnapshot(sn);
+            context.getPersistenceProvider().saveSnapshot(snapshot);
 
-            LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
+            LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), snapshot.getLogMessage());
 
             long dataThreshold = totalMemory *
                     context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
@@ -334,8 +333,7 @@ public class SnapshotManager implements SnapshotState {
             if (context.getId().equals(currentBehavior.getLeaderId())
                     && captureSnapshot.isInstallSnapshotInitiated()) {
                 // this would be call straight to the leader and won't initiate in serialization
-                currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(
-                        ByteString.copyFrom(snapshotBytes)));
+                currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot));
             }
 
             captureSnapshot = null;
index 83c85d9135a773541b40889ecf81453d9607833b..21c65e6037c106135f995323ae10b238029879eb 100644 (file)
@@ -8,16 +8,16 @@
 
 package org.opendaylight.controller.cluster.raft.base.messages;
 
-import com.google.protobuf.ByteString;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 
 public class SendInstallSnapshot {
-    private ByteString snapshot;
+    private final Snapshot snapshot;
 
-    public SendInstallSnapshot(ByteString snapshot) {
+    public SendInstallSnapshot(Snapshot snapshot) {
         this.snapshot = snapshot;
     }
 
-    public ByteString getSnapshot() {
+    public Snapshot getSnapshot() {
         return snapshot;
     }
 }
index de1e1d11ff0c2045e3056336a4bef5344d21a240..94fa507b47aff41a6d4308ca687db8639c955559 100644 (file)
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
 import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
@@ -33,6 +34,7 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -88,7 +90,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     protected final int minIsolatedLeaderPeerCount;
 
-    private Optional<ByteString> snapshot;
+    private Optional<SnapshotHolder> snapshot;
 
     public AbstractLeader(RaftActorContext context) {
         super(context, RaftState.Leader);
@@ -140,8 +142,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    void setSnapshot(Optional<ByteString> snapshot) {
-        this.snapshot = snapshot;
+    void setSnapshot(@Nullable Snapshot snapshot) {
+        if(snapshot != null) {
+            this.snapshot = Optional.of(new SnapshotHolder(snapshot));
+        } else {
+            this.snapshot = Optional.absent();
+        }
     }
 
     @Override
@@ -339,7 +345,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         } else if(message instanceof SendInstallSnapshot) {
             // received from RaftActor
-            setSnapshot(Optional.of(((SendInstallSnapshot) message).getSnapshot()));
+            setSnapshot(((SendInstallSnapshot) message).getSnapshot());
             sendInstallSnapshot();
 
         } else if (message instanceof Replicate) {
@@ -382,10 +388,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         );
                     }
 
-                    followerLogInformation.setMatchIndex(
-                        context.getReplicatedLog().getSnapshotIndex());
-                    followerLogInformation.setNextIndex(
-                        context.getReplicatedLog().getSnapshotIndex() + 1);
+                    long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+                    followerLogInformation.setMatchIndex(followerMatchIndex);
+                    followerLogInformation.setNextIndex(followerMatchIndex + 1);
                     mapFollowerToSnapshot.remove(followerId);
 
                     LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
@@ -395,7 +400,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     if (mapFollowerToSnapshot.isEmpty()) {
                         // once there are no pending followers receiving snapshots
                         // we can remove snapshot from the memory
-                        setSnapshot(Optional.<ByteString>absent());
+                        setSnapshot(null);
                     }
                     wasLastChunk = true;
 
@@ -618,7 +623,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
         try {
             if (snapshot.isPresent()) {
-                ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+                ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
 
                 // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
                 // followerId to the followerToSnapshot map.
@@ -626,8 +631,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 followerActor.tell(
                     new InstallSnapshot(currentTerm(), context.getId(),
-                        context.getReplicatedLog().getSnapshotIndex(),
-                        context.getReplicatedLog().getSnapshotTerm(),
+                        snapshot.get().getLastIncludedIndex(),
+                        snapshot.get().getLastIncludedTerm(),
                         nextSnapshotChunk,
                         followerToSnapshot.incrementChunkIndex(),
                         followerToSnapshot.getTotalChunks(),
@@ -871,4 +876,28 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     public int followerLogSize() {
         return followerToLog.size();
     }
+
+    private static class SnapshotHolder {
+        private final long lastIncludedTerm;
+        private final long lastIncludedIndex;
+        private final ByteString snapshotBytes;
+
+        SnapshotHolder(Snapshot snapshot) {
+            this.lastIncludedTerm = snapshot.getLastAppliedTerm();
+            this.lastIncludedIndex = snapshot.getLastAppliedIndex();
+            this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+        }
+
+        long getLastIncludedTerm() {
+            return lastIncludedTerm;
+        }
+
+        long getLastIncludedIndex() {
+            return lastIncludedIndex;
+        }
+
+        ByteString getSnapshotBytes() {
+            return snapshotBytes;
+        }
+    }
 }
index 9fe03f9673c9f55cf8759dc51cba69b2c5bf437d..b264739bd1fa630da7f461dbdf207809ae7fddf9 100644 (file)
@@ -20,7 +20,6 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -182,16 +181,11 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         // Wait for the follower to persist the snapshot.
         MessageCollectorActor.expectFirstMatching(follower2CollectorActor, SaveSnapshotSuccess.class);
 
-        // The last applied entry on the leader is included in the snapshot but is also sent in a subsequent
-        // AppendEntries because the InstallSnapshot message lastIncludedIndex field is set to the leader's
-        // snapshotIndex and not the actual last index included in the snapshot.
-        // FIXME? - is this OK?
-        MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
-        List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2, payload2);
+        List<MockPayload> expFollowerState = Arrays.asList(payload0, payload1, payload2);
 
         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
-        assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
 
         killActor(follower2Actor);
@@ -205,7 +199,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
 
         assertEquals("Follower commit index", 2, follower2Context.getCommitIndex());
         assertEquals("Follower last applied", 2, follower2Context.getLastApplied());
-        assertEquals("Follower snapshot index", 1, follower2Context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Follower snapshot index", 2, follower2Context.getReplicatedLog().getSnapshotIndex());
         assertEquals("Follower state", expFollowerState, follower2Underlying.getState());
     }
 
index 0a6082d60db42f4a87cce5ad440e41454c42924b..6c258d33c83c8237bd96de4a9e19413524028b29 100644 (file)
@@ -485,7 +485,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         assertEquals("InstallSnapshot getChunkIndex", 1, installSnapshot.getChunkIndex());
         assertEquals("InstallSnapshot getTotalChunks", 1, installSnapshot.getTotalChunks());
         assertEquals("InstallSnapshot getLastIncludedTerm", currentTerm, installSnapshot.getLastIncludedTerm());
-        assertEquals("InstallSnapshot getLastIncludedIndex", 8, installSnapshot.getLastIncludedIndex());
+        assertEquals("InstallSnapshot getLastIncludedIndex", 9, installSnapshot.getLastIncludedIndex());
         //assertArrayEquals("InstallSnapshot getData", snapshot, installSnapshot.getData().toByteArray());
 
         installSnapshotReply = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, InstallSnapshotReply.class);
@@ -496,13 +496,9 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         // Verify follower 2 applies the snapshot.
         applySnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplySnapshot.class);
-        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 8, currentTerm, 8);
+        verifySnapshot("Follower 2", applySnapshot.getSnapshot(), currentTerm, 9, currentTerm, 9);
         assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, applySnapshot.getSnapshot().getUnAppliedEntries().size());
 
-        // Verify follower 2 only applies the second log entry (9) as the first one (8) was in the snapshot.
-        applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
-        verifyApplyState(applyState, null, null, currentTerm, 9, payload9);
-
         // Wait for the snapshot to complete.
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
 
index d94eb6b041a52631c5ca02684e0c24bc32b170e6..1829a2d5708d0e8f80e71cae7c7962b7ebd1a363 100644 (file)
@@ -298,7 +298,7 @@ public class SnapshotManagerTest extends AbstractActorTest {
 
         SendInstallSnapshot sendInstallSnapshot = sendInstallSnapshotArgumentCaptor.getValue();
 
-        assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().toByteArray()));
+        assertTrue(Arrays.equals(bytes, sendInstallSnapshot.getSnapshot().getState()));
     }
 
     @Test
@@ -609,4 +609,4 @@ public class SnapshotManagerTest extends AbstractActorTest {
         assertEquals("getTerm", -1L, reader.getTerm());
         assertEquals("getIndex", -1L, reader.getIndex());
     }
-}
\ No newline at end of file
+}
index 07548d6a9248b450e9ee478fabbe6dd7d357829e..89b83b3b369e8e5ed70349e2a1c8fcd223d7c9b4 100644 (file)
@@ -9,10 +9,10 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
@@ -439,8 +440,8 @@ public class LeaderTest extends AbstractLeaderTest {
         //clears leaders log
         actorContext.getReplicatedLog().removeFrom(0);
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int newEntryIndex = 4;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
@@ -448,12 +449,15 @@ public class LeaderTest extends AbstractLeaderTest {
         // set the snapshot variables in replicatedlog
         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
         //set follower timeout to 2 mins, helps during debugging
         actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         // new entry
         ReplicatedLogImplEntry entry =
                 new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
@@ -463,7 +467,8 @@ public class LeaderTest extends AbstractLeaderTest {
         leader.markFollowerActive(FOLLOWER_ID);
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
 
@@ -489,7 +494,7 @@ public class LeaderTest extends AbstractLeaderTest {
 
         InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
-        assertEquals(snapshotIndex, is.getLastIncludedIndex());
+        assertEquals(commitIndex, is.getLastIncludedIndex());
     }
 
     @Test
@@ -573,7 +578,7 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         // set the snapshot as absent and check if capture-snapshot is invoked.
-        leader.setSnapshot(Optional.<ByteString>absent());
+        leader.setSnapshot(null);
 
         // new entry
         ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
@@ -616,8 +621,8 @@ public class LeaderTest extends AbstractLeaderTest {
         //clears leaders log
         actorContext.getReplicatedLog().removeFrom(0);
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int lastAppliedIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
@@ -625,15 +630,22 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
         actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(lastAppliedIndex);
+        actorContext.setLastApplied(lastAppliedIndex);
 
         leader = new Leader(actorContext);
 
-        // Ignore initial heartbeat.
+        // Initial heartbeat.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
-                new SendInstallSnapshot(toByteString(leadersSnapshot)));
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
+        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
         assertTrue(raftBehavior instanceof Leader);
 
@@ -642,7 +654,7 @@ public class LeaderTest extends AbstractLeaderTest {
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
         assertNotNull(installSnapshot.getData());
-        assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex());
+        assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
         assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
 
         assertEquals(currentTerm, installSnapshot.getTerm());
@@ -654,15 +666,18 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         // Ignore initial heartbeat.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
@@ -678,7 +693,8 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm));
         FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
         while(!fts.isLastChunk(fts.getChunkIndex())) {
@@ -698,9 +714,8 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(1, leader.followerLogSize());
         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
         assertNotNull(fli);
-        assertEquals(snapshotIndex, fli.getMatchIndex());
-        assertEquals(snapshotIndex, fli.getMatchIndex());
-        assertEquals(snapshotIndex + 1, fli.getNextIndex());
+        assertEquals(commitIndex, fli.getMatchIndex());
+        assertEquals(commitIndex + 1, fli.getNextIndex());
     }
 
     @Test
@@ -709,8 +724,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
@@ -724,10 +739,13 @@ public class LeaderTest extends AbstractLeaderTest {
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
 
         actorContext.setConfigParams(configParams);
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
         leadersSnapshot.put("2", "B");
@@ -739,9 +757,11 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+        leader.setSnapshot(snapshot);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
@@ -780,8 +800,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
@@ -792,10 +812,13 @@ public class LeaderTest extends AbstractLeaderTest {
             }
         });
 
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
         leadersSnapshot.put("2", "B");
@@ -807,10 +830,12 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+        leader.setSnapshot(snapshot);
 
         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
 
@@ -839,8 +864,8 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        final int followersLastIndex = 2;
-        final int snapshotIndex = 3;
+        final int commitIndex = 3;
+        final int snapshotIndex = 2;
         final int snapshotTerm = 1;
         final int currentTerm = 2;
 
@@ -851,10 +876,13 @@ public class LeaderTest extends AbstractLeaderTest {
             }
         });
 
-        actorContext.setCommitIndex(followersLastIndex);
+        actorContext.setCommitIndex(commitIndex);
 
         leader = new Leader(actorContext);
 
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
         leadersSnapshot.put("2", "B");
@@ -866,9 +894,11 @@ public class LeaderTest extends AbstractLeaderTest {
         actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
 
         ByteString bs = toByteString(leadersSnapshot);
-        leader.setSnapshot(Optional.of(bs));
+        Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+                commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+        leader.setSnapshot(snapshot);
 
-        leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+        leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
 
         InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);