Add LeaderTransitioning message to RaftActor
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
index 51d6478fd6a0a6496b223e95dda79ed442083542..f4846d2589ad050632b47bc24c15b451e818b218 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.raft;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
@@ -20,10 +21,14 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
 import akka.japi.Procedure;
@@ -46,14 +51,19 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@@ -61,19 +71,24 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 public class RaftActorTest extends AbstractActorTest {
@@ -105,6 +120,7 @@ public class RaftActorTest extends AbstractActorTest {
         kit.waitUntilLeader();
     }
 
+
     @Test
     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
@@ -121,7 +137,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
-                    peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
+                    peerAddresses, config), persistenceId);
 
             watch(followerActor);
 
@@ -174,7 +190,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             //reinstate the actor
             TestActorRef<MockRaftActor> ref = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
+                    MockRaftActor.props(persistenceId, peerAddresses, config));
 
             MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -208,7 +224,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                    Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
+                    config, new NonPersistentDataProvider()), persistenceId);
 
             MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -232,7 +248,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                    Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
+                    config, new NonPersistentDataProvider()).
                             withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
 
             InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
@@ -244,8 +260,8 @@ public class RaftActorTest extends AbstractActorTest {
 
             config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
             ref = factory.createTestActor(MockRaftActor.props(persistenceId,
-                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                    Optional.<ConfigParams>of(config), new NonPersistentDataProvider()).
+                    ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
+                    new NonPersistentDataProvider()).
                             withDispatcher(Dispatchers.DefaultDispatcherId()),
                             factory.generateActorId("follower-"));
 
@@ -271,7 +287,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
+                Collections.<String, String>emptyMap(), config), persistenceId);
 
         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -329,8 +345,8 @@ public class RaftActorTest extends AbstractActorTest {
 
         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
 
-        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).snapshotMessageSupport(mockSupport).props());
 
         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -338,34 +354,40 @@ public class RaftActorTest extends AbstractActorTest {
         mockRaftActor.waitForRecoveryComplete();
 
         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
         mockRaftActor.handleCommand(applySnapshot);
 
         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshot);
 
         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshotReply);
 
         SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotSuccess);
 
         SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotFailure);
 
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
+                any(ActorRef.class));
         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
 
-        verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
-        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
-        verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
-        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
-        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
-        verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
+        mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
+
+        verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
+                any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
     }
 
     @Test
@@ -381,7 +403,7 @@ public class RaftActorTest extends AbstractActorTest {
                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -412,7 +434,7 @@ public class RaftActorTest extends AbstractActorTest {
                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -443,9 +465,10 @@ public class RaftActorTest extends AbstractActorTest {
 
             String persistenceId = factory.generateActorId("notifier-");
 
-            TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                    Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
-                    new NonPersistentDataProvider()).withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+            TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                    config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
+                            new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                            persistenceId);
 
             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
 
@@ -530,8 +553,9 @@ public class RaftActorTest extends AbstractActorTest {
 
             String persistenceId = factory.generateActorId("notifier-");
 
-            factory.createActor(MockRaftActor.props(persistenceId,
-                    ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+            factory.createActor(MockRaftActor.builder().id(persistenceId).
+                    peerAddresses(ImmutableMap.of("leader", "fake/path")).
+                    config(config).roleChangeNotifier(notifierActor).props());
 
             List<RoleChanged> matches =  null;
             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
@@ -581,8 +605,7 @@ public class RaftActorTest extends AbstractActorTest {
                 peerAddresses.put(follower1Id, followerActor1.path().toString());
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                        MockRaftActor.props(persistenceId, peerAddresses,
-                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
 
@@ -679,8 +702,7 @@ public class RaftActorTest extends AbstractActorTest {
                 peerAddresses.put(leaderId, leaderActor1.path().toString());
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                        MockRaftActor.props(persistenceId, peerAddresses,
-                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor followerActor = mockActorRef.underlyingActor();
                 followerActor.getRaftActorContext().setCommitIndex(4);
@@ -788,8 +810,7 @@ public class RaftActorTest extends AbstractActorTest {
                 peerAddresses.put(follower2Id, followerActor2.path().toString());
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                        MockRaftActor.props(persistenceId, peerAddresses,
-                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
                 leaderActor.getRaftActorContext().setCommitIndex(9);
@@ -866,8 +887,7 @@ public class RaftActorTest extends AbstractActorTest {
             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, peerAddresses,
-                            Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                    MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
             MockRaftActor leaderActor = mockActorRef.underlyingActor();
             leaderActor.getRaftActorContext().setCommitIndex(3);
@@ -914,8 +934,7 @@ public class RaftActorTest extends AbstractActorTest {
             Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, peerAddresses,
-                            Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                    MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
             MockRaftActor leaderActor = mockActorRef.underlyingActor();
             leaderActor.getRaftActorContext().setCommitIndex(3);
@@ -943,6 +962,40 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testRaftActorOnRecoverySnapshot() throws Exception {
+        TEST_LOG.info("testRaftActorOnRecoverySnapshot");
+
+        new JavaTestKit(getSystem()) {{
+                String persistenceId = factory.generateActorId("follower-");
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                // Set the heartbeat interval high to essentially disable election otherwise the test
+                // may fail if the actor is switched to Leader
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
+
+                // Create mock ReplicatedLogEntry
+                ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
+                        new MockRaftActorContext.MockPayload("F", 1));
+
+                InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
+
+                TestActorRef<MockRaftActor> ref = factory.createTestActor(
+                        MockRaftActor.props(persistenceId, peerAddresses, config));
+
+                MockRaftActor mockRaftActor = ref.underlyingActor();
+
+                mockRaftActor.waitForRecoveryComplete();
+
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
+                verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
+            }};
+    }
+
     @Test
     public void testSwitchBehavior(){
         String persistenceId = factory.generateActorId("leader-");
@@ -957,8 +1010,7 @@ public class RaftActorTest extends AbstractActorTest {
         Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
 
         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                MockRaftActor.props(persistenceId, peerAddresses,
-                        Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
         MockRaftActor leaderActor = mockActorRef.underlyingActor();
 
@@ -1014,8 +1066,7 @@ public class RaftActorTest extends AbstractActorTest {
         DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
         TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
-                MockRaftActor.props(persistenceId, peerAddresses,
-                    Optional.<ConfigParams>of(emptyConfig), dataPersistenceProvider), persistenceId);
+                MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
         MockRaftActor mockRaftActor = actorRef.underlyingActor();
         mockRaftActor.waitForInitializeBehaviorComplete();
 
@@ -1053,4 +1104,268 @@ public class RaftActorTest extends AbstractActorTest {
         assertEquals("Behavior State", RaftState.Follower,
             mockRaftActor.getCurrentBehavior().state());
     }
+
+    @Test
+    public void testGetSnapshot() throws Exception {
+        TEST_LOG.info("testGetSnapshot starting");
+
+        JavaTestKit kit = new JavaTestKit(getSystem());
+
+        String persistenceId = factory.generateActorId("test-actor-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        long term = 3;
+        long seqN = 1;
+        InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0,
+                new MockRaftActorContext.MockPayload("A")));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1,
+                new MockRaftActorContext.MockPayload("B")));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2,
+                new MockRaftActorContext.MockPayload("C")));
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+                ImmutableMap.<String, String>builder().put("member1", "address").build(), config).
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+
+        // Wait for snapshot after recovery
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
+
+        mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+
+        raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+
+        ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
+
+        byte[] stateSnapshot = new byte[]{1,2,3};
+        replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
+
+        GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
+
+        assertEquals("getId", persistenceId, reply.getId());
+        Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+        assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
+        assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
+        assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
+        assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
+        assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
+        assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
+        assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
+        assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
+
+        // Test with timeout
+
+        mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
+        reset(mockRaftActor.snapshotCohortDelegate);
+
+        raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+        Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
+        assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
+
+        mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
+
+        // Test with persistence disabled.
+
+        mockRaftActor.setPersistence(false);
+        reset(mockRaftActor.snapshotCohortDelegate);
+
+        raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+        reply = kit.expectMsgClass(GetSnapshotReply.class);
+        verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
+
+        assertEquals("getId", persistenceId, reply.getId());
+        replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+        assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
+        assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
+        assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
+        assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
+        assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
+        assertEquals("getState length", 0, replySnapshot.getState().length);
+        assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
+
+        TEST_LOG.info("testGetSnapshot ending");
+    }
+
+    @Test
+    public void testRestoreFromSnapshot() throws Exception {
+        TEST_LOG.info("testRestoreFromSnapshot starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+        snapshotUnappliedEntries.add(new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+                new MockRaftActorContext.MockPayload("E")));
+
+        int snapshotLastApplied = 3;
+        int snapshotLastIndex = 4;
+
+        List<MockPayload> state = Arrays.asList(
+                new MockRaftActorContext.MockPayload("A"),
+                new MockRaftActorContext.MockPayload("B"),
+                new MockRaftActorContext.MockPayload("C"),
+                new MockRaftActorContext.MockPayload("D"));
+        ByteString stateBytes = fromObject(state);
+
+        Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries,
+                snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1");
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+
+        Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class);
+        assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor());
+        assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex());
+        assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
+        assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
+        assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
+        assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState());
+        assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
+
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class));
+
+        RaftActorContext context = mockRaftActor.getRaftActorContext();
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
+        assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
+        assertEquals("Recovered state", state, mockRaftActor.getState());
+        assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
+
+        // Test with data persistence disabled
+
+        snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+                -1, -1, -1, -1, 5, "member-1");
+
+        persistenceId = factory.generateActorId("test-actor-");
+
+        raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).
+                persistent(Optional.of(Boolean.FALSE)).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+        assertEquals("snapshot committed", true,
+                Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS));
+
+        context = mockRaftActor.getRaftActorContext();
+        assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
+
+        TEST_LOG.info("testRestoreFromSnapshot ending");
+    }
+
+    @Test
+    public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
+        TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
+        Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.<ReplicatedLogEntry>asList(),
+                5, 2, 5, 2, 2, "member-1");
+
+        InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                new MockRaftActorContext.MockPayload("B")));
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(500).never()).applySnapshot(any(byte[].class));
+
+        RaftActorContext context = mockRaftActor.getRaftActorContext();
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", -1, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
+
+        TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
+    }
+
+    @Test
+    public void testNonVotingOnRecovery() throws Exception {
+        TEST_LOG.info("testNonVotingOnRecovery starting");
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setElectionTimeoutFactor(1);
+        config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
+
+        String persistenceId = factory.generateActorId("test-actor-");
+        InMemoryJournal.addEntry(persistenceId, 1,  new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForInitializeBehaviorComplete();
+
+        // Sleep a bit and verify it didn't get an election timeout and schedule an election.
+
+        Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
+        assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
+
+        TEST_LOG.info("testNonVotingOnRecovery ending");
+    }
+
+    @Test
+    public void testLeaderTransitioning() throws Exception {
+        TEST_LOG.info("testLeaderTransitioning starting");
+
+        TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+                Props.create(MessageCollectorActor.class));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForInitializeBehaviorComplete();
+
+        raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
+                0L, -1L, (short)1), ActorRef.noSender());
+        LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+                notifierActor, LeaderStateChanged.class);
+        assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
+
+        MessageCollectorActor.clearMessages(notifierActor);
+
+        raftActorRef.tell(new LeaderTransitioning(), ActorRef.noSender());
+
+        leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+        assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
+        assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
+
+        TEST_LOG.info("testLeaderTransitioning ending");
+    }
 }