Avoid unnecessary unsuccessful AppendEntriesReply
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / FollowerTest.java
index 8cb914c2267b51375ef819ce26f9ff0818882347..4dd2b9b5363b41b7d56bcc7adb912a753526bf76 100644 (file)
@@ -14,23 +14,23 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import akka.actor.ActorRef;
-import akka.actor.Props;
 import akka.dispatch.Dispatchers;
-import akka.testkit.JavaTestKit;
+import akka.protobuf.ByteString;
 import akka.testkit.TestActorRef;
+import akka.testkit.javadsl.TestKit;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.protobuf.ByteString;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,12 +44,11 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActor;
 import org.opendaylight.controller.cluster.raft.MockRaftActor.Builder;
+import org.opendaylight.controller.cluster.raft.MockRaftActor.MockSnapshotState;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.cluster.raft.RaftActorTest;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -63,9 +62,12 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
@@ -75,11 +77,11 @@ import scala.concurrent.duration.FiniteDuration;
 
 public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
-    private final TestActorRef<MessageCollectorActor> followerActor = actorFactory.createTestActor(
-            Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower"));
+    private final ActorRef followerActor = actorFactory.createActor(
+            MessageCollectorActor.props(), actorFactory.generateActorId("follower"));
 
-    private final TestActorRef<MessageCollectorActor> leaderActor = actorFactory.createTestActor(
-            Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader"));
+    private final ActorRef leaderActor = actorFactory.createActor(
+            MessageCollectorActor.props(), actorFactory.generateActorId("leader"));
 
     private Follower follower;
 
@@ -87,7 +89,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
     @Override
     @After
-    public void tearDown() throws Exception {
+    public void tearDown() {
         if (follower != null) {
             follower.close();
         }
@@ -96,7 +98,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Override
-    protected Follower createBehavior(RaftActorContext actorContext) {
+    protected Follower createBehavior(final RaftActorContext actorContext) {
         return spy(new Follower(actorContext));
     }
 
@@ -106,9 +108,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Override
-    protected  MockRaftActorContext createActorContext(ActorRef actorRef) {
+    protected  MockRaftActorContext createActorContext(final ActorRef actorRef) {
         MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
-        context.setPayloadVersion(payloadVersion );
+        context.setPayloadVersion(payloadVersion);
         return context;
     }
 
@@ -205,7 +207,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
 
     @Test
-    public void testHandleFirstAppendEntries() throws Exception {
+    public void testHandleFirstAppendEntries() {
         logStart("testHandleFirstAppendEntries");
 
         MockRaftActorContext context = createActorContext();
@@ -233,7 +235,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
         logStart("testHandleFirstAppendEntries");
 
         MockRaftActorContext context = createActorContext();
@@ -256,9 +258,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
-            throws Exception {
-        logStart("testHandleFirstAppendEntries");
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
+        logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog");
 
         MockRaftActorContext context = createActorContext();
         context.getReplicatedLog().clear(0,2);
@@ -283,9 +284,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
-            throws Exception {
-        logStart("testHandleFirstAppendEntries");
+    public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
+        logStart("testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot");
 
         MockRaftActorContext context = createActorContext();
         context.getReplicatedLog().clear(0,2);
@@ -309,8 +309,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
-            throws Exception {
+    public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
         logStart(
                "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
 
@@ -336,7 +335,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testHandleSyncUpAppendEntries() throws Exception {
+    public void testHandleSyncUpAppendEntries() {
         logStart("testHandleSyncUpAppendEntries");
 
         MockRaftActorContext context = createActorContext();
@@ -356,15 +355,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertFalse(syncStatus.isInitialSyncDone());
 
         // Clear all the messages
-        followerActor.underlyingActor().clear();
+        MessageCollectorActor.clearMessages(followerActor);
 
         context.setLastApplied(101);
         context.setCommitIndex(101);
-        setLastLogEntry(context, 1, 101,
-                new MockRaftActorContext.MockPayload(""));
+        setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
 
-        entries = Arrays.asList(
-                newReplicatedLogEntry(2, 101, "foo"));
+        entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
 
         // The new commitIndex is 101
         appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
@@ -374,7 +371,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         assertTrue(syncStatus.isInitialSyncDone());
 
-        followerActor.underlyingActor().clear();
+        MessageCollectorActor.clearMessages(followerActor);
 
         // Sending the same message again should not generate another message
 
@@ -383,11 +380,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
 
         assertNull(syncStatus);
-
     }
 
     @Test
-    public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
+    public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
         logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
 
         MockRaftActorContext context = createActorContext();
@@ -407,7 +403,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertFalse(syncStatus.isInitialSyncDone());
 
         // Clear all the messages
-        followerActor.underlyingActor().clear();
+        MessageCollectorActor.clearMessages(followerActor);
 
         context.setLastApplied(100);
         setLastLogEntry(context, 1, 100,
@@ -424,12 +420,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         // We get a new message saying initial status is not done
         assertFalse(syncStatus.isInitialSyncDone());
-
     }
 
-
     @Test
-    public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
+    public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
         logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
 
         MockRaftActorContext context = createActorContext();
@@ -449,7 +443,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertFalse(syncStatus.isInitialSyncDone());
 
         // Clear all the messages
-        followerActor.underlyingActor().clear();
+        MessageCollectorActor.clearMessages(followerActor);
 
         context.setLastApplied(101);
         context.setCommitIndex(101);
@@ -468,7 +462,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertTrue(syncStatus.isInitialSyncDone());
 
         // Clear all the messages
-        followerActor.underlyingActor().clear();
+        MessageCollectorActor.clearMessages(followerActor);
 
         context.setLastApplied(100);
         setLastLogEntry(context, 1, 100,
@@ -485,10 +479,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         // We get a new message saying initial status is not done
         assertFalse(syncStatus.isInitialSyncDone());
-
     }
 
-
     /**
      * This test verifies that when an AppendEntries RPC is received by a RaftActor
      * with a commitIndex that is greater than what has been applied to the
@@ -496,7 +488,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
      * sets it current applied state to the commitIndex of the sender.
      */
     @Test
-    public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
+    public void testHandleAppendEntriesWithNewerCommitIndex() {
         logStart("testHandleAppendEntriesWithNewerCommitIndex");
 
         MockRaftActorContext context = createActorContext();
@@ -519,7 +511,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     /**
-     * This test verifies that when an AppendEntries is received a specific prevLogTerm
+     * This test verifies that when an AppendEntries is received with a prevLogTerm
      * which does not match the term that is in RaftActors log entry at prevLogIndex
      * then the RaftActor does not change it's state and it returns a failure.
      */
@@ -529,12 +521,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         MockRaftActorContext context = createActorContext();
 
-        // First set the receivers term to lower number
-        context.getTermInformation().update(95, "test");
-
-        // AppendEntries is now sent with a bigger term
-        // this will set the receivers term to be the same as the sender's term
-        AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0);
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 0, 2, Collections.emptyList(), 101, -1, (short)0);
 
         follower = createBehavior(context);
 
@@ -548,6 +535,28 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("isSuccess", false, reply.isSuccess());
     }
 
+    @Test
+    public void testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot() {
+        logStart("testHandleAppendEntriesSenderPrevLogIndexIsInTheSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+        context.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(5, 8, 3).build());
+        context.getReplicatedLog().setSnapshotIndex(4);
+        context.getReplicatedLog().setSnapshotTerm(3);
+
+        AppendEntries appendEntries = new AppendEntries(3, "leader", 1, 3, Collections.emptyList(), 8, -1, (short)0);
+
+        follower = createBehavior(context);
+
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+        Assert.assertSame(follower, newBehavior);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+        assertEquals("isSuccess", true, reply.isSuccess());
+    }
+
     /**
      * This test verifies that when a new AppendEntries message is received with
      * new entries and the logs of the sender and receiver match that the new
@@ -755,7 +764,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two"));
 
-        leaderActor.underlyingActor().clear();
+        MessageCollectorActor.clearMessages(leaderActor);
         follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0));
 
         assertEquals("Next index", 3, log.last().getIndex() + 1);
@@ -801,7 +810,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
      * the follower its applied correctly.
      */
     @Test
-    public void testHandleInstallSnapshot() throws Exception {
+    public void testHandleInstallSnapshot() {
         logStart("testHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
@@ -809,7 +818,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         follower = createBehavior(context);
 
-        ByteString bsSnapshot  = createSnapshot();
+        ByteString bsSnapshot = createSnapshot();
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
@@ -838,7 +847,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(),
                 snapshot.getLastAppliedIndex());
         assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm());
-        Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState());
+        assertEquals("getState type", ByteState.class, snapshot.getState().getClass());
+        Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), ((ByteState)snapshot.getState()).getBytes());
         assertEquals("getElectionTerm", 1, snapshot.getElectionTerm());
         assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor());
         applySnapshot.getCallback().onSuccess();
@@ -858,13 +868,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
     }
 
-
     /**
      * Verify that when an AppendEntries is sent to a follower during a snapshot install
      * the Follower short-circuits the processing of the AppendEntries message.
      */
     @Test
-    public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
+    public void testReceivingAppendEntriesDuringInstallSnapshot() {
         logStart("testReceivingAppendEntriesDuringInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
@@ -907,7 +916,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
+    public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
         logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
 
         MockRaftActorContext context = createActorContext();
@@ -950,7 +959,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
+    public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
         logStart("testInitialSyncUpWithHandleInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
@@ -983,7 +992,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertFalse(syncStatus.isInitialSyncDone());
 
         // Clear all the messages
-        followerActor.underlyingActor().clear();
+        MessageCollectorActor.clearMessages(followerActor);
 
         context.setLastApplied(101);
         context.setCommitIndex(101);
@@ -1003,7 +1012,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+    public void testHandleOutOfSequenceInstallSnapshot() {
         logStart("testHandleOutOfSequenceInstallSnapshot");
 
         MockRaftActorContext context = createActorContext();
@@ -1105,7 +1114,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception {
+    public void testCaptureSnapshotOnLastEntryInAppendEntries() {
         String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
         logStart(id);
 
@@ -1126,6 +1135,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         InMemorySnapshotStore.addSnapshotSavedLatch(id);
         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+        InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
 
         List<ReplicatedLogEntry> entries = Arrays.asList(
                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"));
@@ -1140,6 +1150,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
 
         InMemoryJournal.waitForDeleteMessagesComplete(id);
+        InMemoryJournal.waitForWriteMessagesComplete(id);
         // We expect the ApplyJournalEntries for index 1 to remain in the persisted log b/c it's still queued for
         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
         // This is OK - on recovery it will be a no-op since index 1 has already been applied.
@@ -1154,11 +1165,11 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
         assertEquals("Snapshot getLastIndex", 1, snapshot.getLastIndex());
         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData()),
-                MockRaftActor.toObject(snapshot.getState()));
+                MockRaftActor.fromState(snapshot.getState()));
     }
 
     @Test
-    public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception {
+    public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
         String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
         logStart(id);
 
@@ -1179,6 +1190,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         InMemorySnapshotStore.addSnapshotSavedLatch(id);
         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+        InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
 
         List<ReplicatedLogEntry> entries = Arrays.asList(
                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
@@ -1194,6 +1206,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
 
         InMemoryJournal.waitForDeleteMessagesComplete(id);
+        InMemoryJournal.waitForWriteMessagesComplete(id);
         // We expect the ApplyJournalEntries for index 2 to remain in the persisted log b/c it's still queued for
         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
         // This is OK - on recovery it will be a no-op since index 2 has already been applied.
@@ -1208,14 +1221,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData(), entries.get(1).getData(),
-                entries.get(2).getData()), MockRaftActor.toObject(snapshot.getState()));
+                entries.get(2).getData()), MockRaftActor.fromState(snapshot.getState()));
 
         assertEquals("Journal size", 0, followerRaftActor.get().getReplicatedLog().size());
         assertEquals("Snapshot index", 2, followerRaftActor.get().getReplicatedLog().getSnapshotIndex());
 
         // Reinstate the actor from persistence
 
-        actorFactory.killActor(followerActorRef, new JavaTestKit(getSystem()));
+        actorFactory.killActor(followerActorRef, new TestKit(getSystem()));
 
         followerActorRef = actorFactory.createTestActor(builder.props()
                 .withDispatcher(Dispatchers.DefaultDispatcherId()), id);
@@ -1231,7 +1244,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Test
-    public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception {
+    public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
         String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
         logStart(id);
 
@@ -1252,6 +1265,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
 
         InMemorySnapshotStore.addSnapshotSavedLatch(id);
         InMemoryJournal.addDeleteMessagesCompleteLatch(id);
+        InMemoryJournal.addWriteMessagesCompleteLatch(id, 1, ApplyJournalEntries.class);
 
         List<ReplicatedLogEntry> entries = Arrays.asList(
                 newReplicatedLogEntry(1, 0, "one"), newReplicatedLogEntry(1, 1, "two"),
@@ -1267,6 +1281,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         final Snapshot snapshot = InMemorySnapshotStore.waitForSavedSnapshot(id, Snapshot.class);
 
         InMemoryJournal.waitForDeleteMessagesComplete(id);
+        InMemoryJournal.waitForWriteMessagesComplete(id);
         // We expect the ApplyJournalEntries for index 0 to remain in the persisted log b/c it's still queued for
         // persistence by the time we initiate capture so the last persisted journal sequence number doesn't include it.
         // This is OK - on recovery it will be a no-op since index 0 has already been applied.
@@ -1283,30 +1298,39 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertEquals("Snapshot getLastTerm", 1, snapshot.getLastTerm());
         assertEquals("Snapshot getLastIndex", 2, snapshot.getLastIndex());
         assertEquals("Snapshot state", ImmutableList.of(entries.get(0).getData()),
-                MockRaftActor.toObject(snapshot.getState()));
+                MockRaftActor.fromState(snapshot.getState()));
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
+    private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
+            final AtomicReference<MockRaftActor> followerRaftActor) {
         RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
             @Override
-            public void createSnapshot(ActorRef actorRef) {
+            public void createSnapshot(final ActorRef actorRef,
+                    final java.util.Optional<OutputStream> installSnapshotStream) {
                 try {
-                    actorRef.tell(new CaptureSnapshotReply(RaftActorTest.fromObject(
-                            followerRaftActor.get().getState()).toByteArray()), actorRef);
+                    actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
+                            installSnapshotStream), actorRef);
+                } catch (RuntimeException e) {
+                    throw e;
                 } catch (Exception e) {
-                    Throwables.propagate(e);
+                    throw new RuntimeException(e);
                 }
             }
 
             @Override
-            public void applySnapshot(byte[] snapshotBytes) {
+            public void applySnapshot(final State snapshotState) {
+            }
+
+            @Override
+            public State deserializeSnapshot(final ByteSource snapshotBytes) {
+                throw new UnsupportedOperationException();
             }
         };
         return snapshotCohort;
     }
 
-    public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
+    public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
         int snapshotLength = bs.size();
         int start = offset;
         int size = chunkSize;
@@ -1323,14 +1347,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         return nextChunk;
     }
 
-    private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
-            String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
+    private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
+            final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
         expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
     }
 
-    private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
-                                                   String expFollowerId, long expLogLastTerm, long expLogLastIndex,
-                                                   boolean expForceInstallSnapshot) {
+    private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
+            final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
+            final boolean expForceInstallSnapshot) {
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
                 AppendEntriesReply.class);
@@ -1345,7 +1369,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
 
-    private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
+    private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
         return new SimpleReplicatedLogEntry(index, term,
                 new MockRaftActorContext.MockPayload(data));
     }
@@ -1360,8 +1384,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Override
-    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
-            ActorRef actorRef, RaftRPC rpc) throws Exception {
+    protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+            final ActorRef actorRef, final RaftRPC rpc) {
         super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
 
         String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
@@ -1369,8 +1393,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
     }
 
     @Override
-    protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
-            throws Exception {
+    protected void handleAppendEntriesAddSameEntryToLogReply(final ActorRef replyActor) {
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
         assertEquals("isSuccess", true, reply.isSuccess());
     }