Remove use of {String,UUID}Identifier
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
index e5c8677b0466fa0b282a413ef2291dd0318ce01e..48b555faff286c0393cb2c6ec39030c651751822 100644 (file)
@@ -1,21 +1,36 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.cluster.raft;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.actor.Terminated;
+import akka.dispatch.Dispatchers;
 import akka.japi.Procedure;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
@@ -36,32 +51,44 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
-import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import org.opendaylight.yangtools.concepts.Identifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 public class RaftActorTest extends AbstractActorTest {
@@ -93,6 +120,7 @@ public class RaftActorTest extends AbstractActorTest {
         kit.waitUntilLeader();
     }
 
+
     @Test
     public void testRaftActorRecoveryWithPersistenceEnabled() throws Exception {
         TEST_LOG.info("testRaftActorRecoveryWithPersistenceEnabled starting");
@@ -109,7 +137,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
             ActorRef followerActor = factory.createActor(MockRaftActor.props(persistenceId,
-                    peerAddresses, Optional.<ConfigParams>of(config)), persistenceId);
+                    peerAddresses, config), persistenceId);
 
             watch(followerActor);
 
@@ -162,7 +190,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             //reinstate the actor
             TestActorRef<MockRaftActor> ref = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, peerAddresses, Optional.<ConfigParams>of(config)));
+                    MockRaftActor.props(persistenceId, peerAddresses, config));
 
             MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -196,7 +224,7 @@ public class RaftActorTest extends AbstractActorTest {
 
             TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
                     ImmutableMap.<String, String>builder().put("member1", "address").build(),
-                    Optional.<ConfigParams>of(config), new NonPersistentDataProvider()), persistenceId);
+                    config, new NonPersistentDataProvider()), persistenceId);
 
             MockRaftActor mockRaftActor = ref.underlyingActor();
 
@@ -208,6 +236,48 @@ public class RaftActorTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testUpdateElectionTermPersistedWithPersistenceDisabled() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            String persistenceId = factory.generateActorId("follower-");
+            DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+            config.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+            config.setElectionTimeoutFactor(1);
+
+            InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 1);
+
+            TestActorRef<MockRaftActor> ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+                    ImmutableMap.<String, String>builder().put("member1", "address").build(),
+                    config, new NonPersistentDataProvider()).
+                            withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+
+            InMemoryJournal.waitForWriteMessagesComplete(persistenceId);
+            List<UpdateElectionTerm> entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
+            assertEquals("UpdateElectionTerm entries", 1, entries.size());
+            UpdateElectionTerm updateEntry = entries.get(0);
+
+            factory.killActor(ref, this);
+
+            config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+            ref = factory.createTestActor(MockRaftActor.props(persistenceId,
+                    ImmutableMap.<String, String>builder().put("member1", "address").build(), config,
+                    new NonPersistentDataProvider()).
+                            withDispatcher(Dispatchers.DefaultDispatcherId()),
+                            factory.generateActorId("follower-"));
+
+            MockRaftActor actor = ref.underlyingActor();
+            actor.waitForRecoveryComplete();
+
+            RaftActorContext newContext = actor.getRaftActorContext();
+            assertEquals("electionTerm", updateEntry.getCurrentTerm(),
+                    newContext.getTermInformation().getCurrentTerm());
+            assertEquals("votedFor", updateEntry.getVotedFor(), newContext.getTermInformation().getVotedFor());
+
+            entries = InMemoryJournal.get(persistenceId, UpdateElectionTerm.class);
+            assertEquals("UpdateElectionTerm entries", 1, entries.size());
+        }};
+    }
+
     @Test
     public void testRaftActorForwardsToRaftActorRecoverySupport() {
         String persistenceId = factory.generateActorId("leader-");
@@ -217,7 +287,7 @@ public class RaftActorTest extends AbstractActorTest {
         config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
         TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config)), persistenceId);
+                Collections.<String, String>emptyMap(), config), persistenceId);
 
         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -238,9 +308,6 @@ public class RaftActorTest extends AbstractActorTest {
         ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
         mockRaftActor.handleRecover(applyJournalEntries);
 
-        ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
-        mockRaftActor.handleRecover(applyLogEntries);
-
         DeleteEntries deleteEntries = new DeleteEntries(1);
         mockRaftActor.handleRecover(deleteEntries);
 
@@ -251,13 +318,17 @@ public class RaftActorTest extends AbstractActorTest {
         UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
         mockRaftActor.handleRecover(updateElectionTerm);
 
-        verify(mockSupport).handleRecoveryMessage(same(snapshotOffer));
-        verify(mockSupport).handleRecoveryMessage(same(logEntry));
-        verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries));
-        verify(mockSupport).handleRecoveryMessage(same(applyLogEntries));
-        verify(mockSupport).handleRecoveryMessage(same(deleteEntries));
-        verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries));
-        verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm));
+        org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
+                new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
+        mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
+
+        verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
+        verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
+        verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
+        verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
+        verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
+        verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
+        verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
     }
 
     @Test
@@ -270,8 +341,8 @@ public class RaftActorTest extends AbstractActorTest {
 
         RaftActorSnapshotMessageSupport mockSupport = mock(RaftActorSnapshotMessageSupport.class);
 
-        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), mockSupport), persistenceId);
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).snapshotMessageSupport(mockSupport).props());
 
         MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -279,34 +350,40 @@ public class RaftActorTest extends AbstractActorTest {
         mockRaftActor.waitForRecoveryComplete();
 
         ApplySnapshot applySnapshot = new ApplySnapshot(mock(Snapshot.class));
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
         mockRaftActor.handleCommand(applySnapshot);
 
         CaptureSnapshot captureSnapshot = new CaptureSnapshot(1, 1, 1, 1, 0, 1, null);
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshot);
 
         CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(new byte[0]);
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
         mockRaftActor.handleCommand(captureSnapshotReply);
 
-        SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(mock(SnapshotMetadata.class));
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
+        SaveSnapshotSuccess saveSnapshotSuccess = new SaveSnapshotSuccess(new SnapshotMetadata("", 0L, 0L));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotSuccess);
 
-        SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(mock(SnapshotMetadata.class), new Throwable());
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
+        SaveSnapshotFailure saveSnapshotFailure = new SaveSnapshotFailure(new SnapshotMetadata("", 0L, 0L), new Throwable());
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
         mockRaftActor.handleCommand(saveSnapshotFailure);
 
-        doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
+                any(ActorRef.class));
         mockRaftActor.handleCommand(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT);
 
-        verify(mockSupport).handleSnapshotMessage(same(applySnapshot));
-        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot));
-        verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply));
-        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess));
-        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure));
-        verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT));
+        doReturn(true).when(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
+        mockRaftActor.handleCommand(GetSnapshot.INSTANCE);
+
+        verify(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshot), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotSuccess), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(saveSnapshotFailure), any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT),
+                any(ActorRef.class));
+        verify(mockSupport).handleSnapshotMessage(same(GetSnapshot.INSTANCE), any(ActorRef.class));
     }
 
     @Test
@@ -322,7 +399,7 @@ public class RaftActorTest extends AbstractActorTest {
                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -332,7 +409,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplyJournalEntries(10));
 
-                verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
+                verify(dataPersistenceProvider).persist(any(ApplyJournalEntries.class), any(Procedure.class));
 
             }
 
@@ -353,7 +430,7 @@ public class RaftActorTest extends AbstractActorTest {
                 DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                        Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -362,9 +439,10 @@ public class RaftActorTest extends AbstractActorTest {
                 ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                         new MockRaftActorContext.MockPayload("F"));
 
-                mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
+                final Identifier id = new MockIdentifier("apply-state");
+                mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
 
-                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
 
             }
         };
@@ -384,9 +462,10 @@ public class RaftActorTest extends AbstractActorTest {
 
             String persistenceId = factory.generateActorId("notifier-");
 
-            TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-                    Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor,
-                    new NonPersistentDataProvider()), persistenceId);
+            TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                    config(config).roleChangeNotifier(notifierActor).dataPersistenceProvider(
+                            new NonPersistentDataProvider()).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                            persistenceId);
 
             List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
 
@@ -413,20 +492,23 @@ public class RaftActorTest extends AbstractActorTest {
                     notifierActor, LeaderStateChanged.class);
 
             assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+            assertEquals(MockRaftActor.PAYLOAD_VERSION, leaderStateChange.getLeaderPayloadVersion());
 
             notifierActor.underlyingActor().clear();
 
             MockRaftActor raftActor = raftActorRef.underlyingActor();
             final String newLeaderId = "new-leader";
+            final short newLeaderVersion = 6;
             Follower follower = new Follower(raftActor.getRaftActorContext()) {
                 @Override
                 public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
-                    leaderId = newLeaderId;
+                    setLeaderId(newLeaderId);
+                    setLeaderPayloadVersion(newLeaderVersion);
                     return this;
                 }
             };
 
-            raftActor.changeCurrentBehavior(follower);
+            raftActor.newBehavior(follower);
 
             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
             assertEquals(persistenceId, leaderStateChange.getMemberId());
@@ -443,6 +525,15 @@ public class RaftActorTest extends AbstractActorTest {
             leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
             assertEquals(persistenceId, leaderStateChange.getMemberId());
             assertEquals(newLeaderId, leaderStateChange.getLeaderId());
+            assertEquals(newLeaderVersion, leaderStateChange.getLeaderPayloadVersion());
+
+            notifierActor.underlyingActor().clear();
+
+            raftActor.handleCommand("any");
+
+            Uninterruptibles.sleepUninterruptibly(505, TimeUnit.MILLISECONDS);
+            leaderStateChange = MessageCollectorActor.getFirstMatching(notifierActor, LeaderStateChanged.class);
+            assertNull(leaderStateChange);
         }};
     }
 
@@ -459,8 +550,9 @@ public class RaftActorTest extends AbstractActorTest {
 
             String persistenceId = factory.generateActorId("notifier-");
 
-            factory.createActor(MockRaftActor.props(persistenceId,
-                    ImmutableMap.of("leader", "fake/path"), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
+            factory.createActor(MockRaftActor.builder().id(persistenceId).
+                    peerAddresses(ImmutableMap.of("leader", "fake/path")).
+                    config(config).roleChangeNotifier(notifierActor).props());
 
             List<RoleChanged> matches =  null;
             for(int i = 0; i < 5000 / heartBeatInterval; i++) {
@@ -510,8 +602,7 @@ public class RaftActorTest extends AbstractActorTest {
                 peerAddresses.put(follower1Id, followerActor1.path().toString());
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                        MockRaftActor.props(persistenceId, peerAddresses,
-                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
 
@@ -542,13 +633,13 @@ public class RaftActorTest extends AbstractActorTest {
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
                 //fake snapshot on index 5
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 5, 1, (short)0));
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 //fake snapshot on index 6
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 6, 1, (short)0));
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
@@ -563,9 +654,9 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-4")));
 
                 leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotBytes.toByteArray(),
-                        leader, Runtime.getRuntime().totalMemory());
+                        Runtime.getRuntime().totalMemory());
 
-                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+                assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
                 leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
@@ -579,7 +670,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new ReplicatedLogImplEntry(8, 1, new MockRaftActorContext.MockPayload("foo-8")));
 
                 //fake snapshot on index 7, since lastApplied = 7 , we would keep the last applied
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 7, 1, (short)0));
                 assertEquals(2, leaderActor.getReplicatedLog().size());
                 assertEquals(8, leaderActor.getReplicatedLog().lastIndex());
 
@@ -608,8 +699,7 @@ public class RaftActorTest extends AbstractActorTest {
                 peerAddresses.put(leaderId, leaderActor1.path().toString());
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                        MockRaftActor.props(persistenceId, peerAddresses,
-                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor followerActor = mockActorRef.underlyingActor();
                 followerActor.getRaftActorContext().setCommitIndex(4);
@@ -645,7 +735,7 @@ public class RaftActorTest extends AbstractActorTest {
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                         new MockRaftActorContext.MockPayload("foo-6"))
                         );
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 5, 1, entries, 5, 5, (short)0));
                 assertEquals(7, followerActor.getReplicatedLog().size());
 
                 //fake snapshot on index 7
@@ -656,7 +746,7 @@ public class RaftActorTest extends AbstractActorTest {
                                 (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 6, 1, entries, 6, 6, (short) 0));
                 assertEquals(8, followerActor.getReplicatedLog().size());
 
                 assertEquals(RaftState.Follower, followerActor.getCurrentBehavior().state());
@@ -669,7 +759,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
                 followerActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
+                assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 // The commit is needed to complete the snapshot creation process
                 followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
@@ -684,7 +774,7 @@ public class RaftActorTest extends AbstractActorTest {
                                         new MockRaftActorContext.MockPayload("foo-7"))
                         );
                 // send an additional entry 8 with leaderCommit = 7
-                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7));
+                followerActor.onReceiveCommand(new AppendEntries(1, leaderId, 7, 1, entries, 7, 7, (short) 0));
 
                 // 7 and 8, as lastapplied is 7
                 assertEquals(2, followerActor.getReplicatedLog().size());
@@ -717,8 +807,7 @@ public class RaftActorTest extends AbstractActorTest {
                 peerAddresses.put(follower2Id, followerActor2.path().toString());
 
                 TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                        MockRaftActor.props(persistenceId, peerAddresses,
-                                Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                        MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
                 MockRaftActor leaderActor = mockActorRef.underlyingActor();
                 leaderActor.getRaftActorContext().setCommitIndex(9);
@@ -742,17 +831,17 @@ public class RaftActorTest extends AbstractActorTest {
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower1Id, 1, true, 9, 1, (short) 0));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // set the 2nd follower nextIndex to 1 which has been snapshotted
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 0, 1, (short)0));
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
                 // simulate a real snapshot
-                leaderActor.onReceiveCommand(new SendHeartBeat());
+                leaderActor.onReceiveCommand(SendHeartBeat.INSTANCE);
                 assertEquals(5, leaderActor.getReplicatedLog().size());
                 assertEquals(String.format("expected to be Leader but was %s. Current Leader = %s ",
                         leaderActor.getCurrentBehavior().state(), leaderActor.getLeaderId())
@@ -760,7 +849,7 @@ public class RaftActorTest extends AbstractActorTest {
 
 
                 //reply from a slow follower does not initiate a fake snapshot
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 9, 1, (short)0));
                 assertEquals("Fake snapshot should not happen when Initiate is in progress", 5, leaderActor.getReplicatedLog().size());
 
                 ByteString snapshotBytes = fromObject(Arrays.asList(
@@ -770,12 +859,12 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockPayload("foo-3"),
                         new MockRaftActorContext.MockPayload("foo-4")));
                 leaderActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
-                assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+                assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
 
                 assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0, leaderActor.getReplicatedLog().size());
 
                 //reply from a slow follower after should not raise errors
-                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1));
+                leaderActor.onReceiveCommand(new AppendEntriesReply(follower2Id, 1, true, 5, 1, (short) 0));
                 assertEquals(0, leaderActor.getReplicatedLog().size());
             }
         };
@@ -792,11 +881,10 @@ public class RaftActorTest extends AbstractActorTest {
 
             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
 
-            Map<String, String> peerAddresses = new HashMap<>();
+            Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, peerAddresses,
-                            Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                    MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
             MockRaftActor leaderActor = mockActorRef.underlyingActor();
             leaderActor.getRaftActorContext().setCommitIndex(3);
@@ -814,15 +902,16 @@ public class RaftActorTest extends AbstractActorTest {
             leaderActor.setCurrentBehavior(leader);
             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
-            // Persist another entry (this will cause a CaptureSnapshot to be triggered
-            leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+            // Simulate an install snaphost to a follower.
+            leaderActor.getRaftActorContext().getSnapshotManager().captureToInstall(
+                    leaderActor.getReplicatedLog().last(), -1, "member1");
 
             // Now send a CaptureSnapshotReply
             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
 
             // Trimming log in this scenario is a no-op
             assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
-            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+            assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
             assertEquals(-1, leader.getReplicatedToAllIndex());
 
         }};
@@ -839,11 +928,10 @@ public class RaftActorTest extends AbstractActorTest {
 
             DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
 
-            Map<String, String> peerAddresses = new HashMap<>();
+            Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
 
             TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
-                    MockRaftActor.props(persistenceId, peerAddresses,
-                            Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+                    MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
 
             MockRaftActor leaderActor = mockActorRef.underlyingActor();
             leaderActor.getRaftActorContext().setCommitIndex(3);
@@ -858,19 +946,95 @@ public class RaftActorTest extends AbstractActorTest {
             assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
 
             // Persist another entry (this will cause a CaptureSnapshot to be triggered
-            leaderActor.persistData(mockActorRef, "x", new MockRaftActorContext.MockPayload("duh"));
+            leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
+                new MockRaftActorContext.MockPayload("duh"));
 
             // Now send a CaptureSnapshotReply
             mockActorRef.tell(new CaptureSnapshotReply(fromObject("foo").toByteArray()), mockActorRef);
 
             // Trimming log in this scenario is a no-op
             assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
-            assertFalse(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
+            assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
             assertEquals(3, leader.getReplicatedToAllIndex());
 
         }};
     }
 
+    @Test
+    public void testRaftActorOnRecoverySnapshot() throws Exception {
+        TEST_LOG.info("testRaftActorOnRecoverySnapshot");
+
+        new JavaTestKit(getSystem()) {{
+                String persistenceId = factory.generateActorId("follower-");
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                // Set the heartbeat interval high to essentially disable election otherwise the test
+                // may fail if the actor is switched to Leader
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
+
+                // Create mock ReplicatedLogEntry
+                ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
+                        new MockRaftActorContext.MockPayload("F", 1));
+
+                InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
+
+                TestActorRef<MockRaftActor> ref = factory.createTestActor(
+                        MockRaftActor.props(persistenceId, peerAddresses, config));
+
+                MockRaftActor mockRaftActor = ref.underlyingActor();
+
+                mockRaftActor.waitForRecoveryComplete();
+
+                mockRaftActor.waitForInitializeBehaviorComplete();
+
+                verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
+            }};
+    }
+
+    @Test
+    public void testSwitchBehavior(){
+        String persistenceId = factory.generateActorId("leader-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass("org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
+        config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
+        config.setSnapshotBatchCount(5);
+
+        DataPersistenceProvider dataPersistenceProvider = new NonPersistentDataProvider();
+
+        Map<String, String> peerAddresses = ImmutableMap.<String, String>builder().build();
+
+        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(
+                MockRaftActor.props(persistenceId, peerAddresses, config, dataPersistenceProvider), persistenceId);
+
+        MockRaftActor leaderActor = mockActorRef.underlyingActor();
+
+        leaderActor.waitForRecoveryComplete();
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.Follower, 100));
+
+        assertEquals(100, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Follower, leaderActor.getCurrentBehavior().state());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.Leader, 110));
+
+        assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.Candidate, 125));
+
+        assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+
+        leaderActor.handleCommand(new SwitchBehavior(RaftState.IsolatedLeader, 125));
+
+        assertEquals(110, leaderActor.getRaftActorContext().getTermInformation().getCurrentTerm());
+        assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
+    }
+
     public static ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
@@ -891,4 +1055,316 @@ public class RaftActorTest extends AbstractActorTest {
         }
     }
 
+    @Test
+    public void testUpdateConfigParam() throws Exception {
+        DefaultConfigParamsImpl emptyConfig = new DefaultConfigParamsImpl();
+        String persistenceId = factory.generateActorId("follower-");
+        ImmutableMap<String, String> peerAddresses =
+            ImmutableMap.<String, String>builder().put("member1", "address").build();
+        DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+        TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
+                MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
+        MockRaftActor mockRaftActor = actorRef.underlyingActor();
+        mockRaftActor.waitForInitializeBehaviorComplete();
+
+        RaftActorBehavior behavior = mockRaftActor.getCurrentBehavior();
+        mockRaftActor.updateConfigParams(emptyConfig);
+        assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+
+        DefaultConfigParamsImpl disableConfig = new DefaultConfigParamsImpl();
+        disableConfig.setCustomRaftPolicyImplementationClass(
+            "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
+        mockRaftActor.updateConfigParams(disableConfig);
+        assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+
+        behavior = mockRaftActor.getCurrentBehavior();
+        mockRaftActor.updateConfigParams(disableConfig);
+        assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+
+        DefaultConfigParamsImpl defaultConfig = new DefaultConfigParamsImpl();
+        defaultConfig.setCustomRaftPolicyImplementationClass(
+            "org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy");
+        mockRaftActor.updateConfigParams(defaultConfig);
+        assertNotSame("Different Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+
+        behavior = mockRaftActor.getCurrentBehavior();
+        mockRaftActor.updateConfigParams(defaultConfig);
+        assertSame("Same Behavior", behavior, mockRaftActor.getCurrentBehavior());
+        assertEquals("Behavior State", RaftState.Follower,
+            mockRaftActor.getCurrentBehavior().state());
+    }
+
+    @Test
+    public void testGetSnapshot() throws Exception {
+        TEST_LOG.info("testGetSnapshot starting");
+
+        JavaTestKit kit = new JavaTestKit(getSystem());
+
+        String persistenceId = factory.generateActorId("test-actor-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        long term = 3;
+        long seqN = 1;
+        InMemoryJournal.addEntry(persistenceId, seqN++, new UpdateElectionTerm(term, "member-1"));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 0,
+                new MockRaftActorContext.MockPayload("A")));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 1,
+                new MockRaftActorContext.MockPayload("B")));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new ApplyJournalEntries(1));
+        InMemoryJournal.addEntry(persistenceId, seqN++, new MockRaftActorContext.MockReplicatedLogEntry(term, 2,
+                new MockRaftActorContext.MockPayload("C")));
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
+                ImmutableMap.<String, String>builder().put("member1", "address").build(), config).
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+
+        // Wait for snapshot after recovery
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
+
+        mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+
+        raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+
+        ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture());
+
+        byte[] stateSnapshot = new byte[]{1,2,3};
+        replyActor.getValue().tell(new CaptureSnapshotReply(stateSnapshot), ActorRef.noSender());
+
+        GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
+
+        assertEquals("getId", persistenceId, reply.getId());
+        Snapshot replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+        assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
+        assertEquals("getLastAppliedIndex", 1L, replySnapshot.getLastAppliedIndex());
+        assertEquals("getLastAppliedTerm", term, replySnapshot.getLastAppliedTerm());
+        assertEquals("getLastIndex", 2L, replySnapshot.getLastIndex());
+        assertEquals("getLastTerm", term, replySnapshot.getLastTerm());
+        assertArrayEquals("getState", stateSnapshot, replySnapshot.getState());
+        assertEquals("getUnAppliedEntries size", 1, replySnapshot.getUnAppliedEntries().size());
+        assertEquals("UnApplied entry index ", 2L, replySnapshot.getUnAppliedEntries().get(0).getIndex());
+
+        // Test with timeout
+
+        mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
+        reset(mockRaftActor.snapshotCohortDelegate);
+
+        raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+        Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
+        assertEquals("Failure cause type", TimeoutException.class, failure.cause().getClass());
+
+        mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(30, TimeUnit.SECONDS));
+
+        // Test with persistence disabled.
+
+        mockRaftActor.setPersistence(false);
+        reset(mockRaftActor.snapshotCohortDelegate);
+
+        raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
+        reply = kit.expectMsgClass(GetSnapshotReply.class);
+        verify(mockRaftActor.snapshotCohortDelegate, never()).createSnapshot(any(ActorRef.class));
+
+        assertEquals("getId", persistenceId, reply.getId());
+        replySnapshot = SerializationUtils.deserialize(reply.getSnapshot());
+        assertEquals("getElectionTerm", term, replySnapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", "member-1", replySnapshot.getElectionVotedFor());
+        assertEquals("getLastAppliedIndex", -1L, replySnapshot.getLastAppliedIndex());
+        assertEquals("getLastAppliedTerm", -1L, replySnapshot.getLastAppliedTerm());
+        assertEquals("getLastIndex", -1L, replySnapshot.getLastIndex());
+        assertEquals("getLastTerm", -1L, replySnapshot.getLastTerm());
+        assertEquals("getState length", 0, replySnapshot.getState().length);
+        assertEquals("getUnAppliedEntries size", 0, replySnapshot.getUnAppliedEntries().size());
+
+        TEST_LOG.info("testGetSnapshot ending");
+    }
+
+    @Test
+    public void testRestoreFromSnapshot() throws Exception {
+        TEST_LOG.info("testRestoreFromSnapshot starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        List<ReplicatedLogEntry> snapshotUnappliedEntries = new ArrayList<>();
+        snapshotUnappliedEntries.add(new MockRaftActorContext.MockReplicatedLogEntry(1, 4,
+                new MockRaftActorContext.MockPayload("E")));
+
+        int snapshotLastApplied = 3;
+        int snapshotLastIndex = 4;
+
+        List<MockPayload> state = Arrays.asList(
+                new MockRaftActorContext.MockPayload("A"),
+                new MockRaftActorContext.MockPayload("B"),
+                new MockRaftActorContext.MockPayload("C"),
+                new MockRaftActorContext.MockPayload("D"));
+        ByteString stateBytes = fromObject(state);
+
+        Snapshot snapshot = Snapshot.create(stateBytes.toByteArray(), snapshotUnappliedEntries,
+                snapshotLastIndex, 1, snapshotLastApplied, 1, 1, "member-1");
+
+        InMemorySnapshotStore.addSnapshotSavedLatch(persistenceId);
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+
+        Snapshot savedSnapshot = InMemorySnapshotStore.waitForSavedSnapshot(persistenceId, Snapshot.class);
+        assertEquals("getElectionTerm", snapshot.getElectionTerm(), savedSnapshot.getElectionTerm());
+        assertEquals("getElectionVotedFor", snapshot.getElectionVotedFor(), savedSnapshot.getElectionVotedFor());
+        assertEquals("getLastAppliedIndex", snapshot.getLastAppliedIndex(), savedSnapshot.getLastAppliedIndex());
+        assertEquals("getLastAppliedTerm", snapshot.getLastAppliedTerm(), savedSnapshot.getLastAppliedTerm());
+        assertEquals("getLastIndex", snapshot.getLastIndex(), savedSnapshot.getLastIndex());
+        assertEquals("getLastTerm", snapshot.getLastTerm(), savedSnapshot.getLastTerm());
+        assertArrayEquals("getState", snapshot.getState(), savedSnapshot.getState());
+        assertEquals("getUnAppliedEntries", snapshot.getUnAppliedEntries(), savedSnapshot.getUnAppliedEntries());
+
+        verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).applySnapshot(any(byte[].class));
+
+        RaftActorContext context = mockRaftActor.getRaftActorContext();
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", snapshotLastIndex, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", snapshotLastApplied, context.getLastApplied());
+        assertEquals("Commit index", snapshotLastApplied, context.getCommitIndex());
+        assertEquals("Recovered state", state, mockRaftActor.getState());
+        assertEquals("Current term", 1L, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
+
+        // Test with data persistence disabled
+
+        snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+                -1, -1, -1, -1, 5, "member-1");
+
+        persistenceId = factory.generateActorId("test-actor-");
+
+        raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).
+                persistent(Optional.of(Boolean.FALSE)).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+        assertEquals("snapshot committed", true,
+                Uninterruptibles.awaitUninterruptibly(mockRaftActor.snapshotCommitted, 5, TimeUnit.SECONDS));
+
+        context = mockRaftActor.getRaftActorContext();
+        assertEquals("Current term", 5L, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted for", "member-1", context.getTermInformation().getVotedFor());
+
+        TEST_LOG.info("testRestoreFromSnapshot ending");
+    }
+
+    @Test
+    public void testRestoreFromSnapshotWithRecoveredData() throws Exception {
+        TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData starting");
+
+        String persistenceId = factory.generateActorId("test-actor-");
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        List<MockPayload> state = Arrays.asList(new MockRaftActorContext.MockPayload("A"));
+        Snapshot snapshot = Snapshot.create(fromObject(state).toByteArray(), Arrays.<ReplicatedLogEntry>asList(),
+                5, 2, 5, 2, 2, "member-1");
+
+        InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                new MockRaftActorContext.MockPayload("B")));
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).restoreFromSnapshot(SerializationUtils.serialize(snapshot)).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForRecoveryComplete();
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verify(mockRaftActor.snapshotCohortDelegate, never()).applySnapshot(any(byte[].class));
+
+        RaftActorContext context = mockRaftActor.getRaftActorContext();
+        assertEquals("Journal log size", 1, context.getReplicatedLog().size());
+        assertEquals("Last index", 0, context.getReplicatedLog().lastIndex());
+        assertEquals("Last applied", -1, context.getLastApplied());
+        assertEquals("Commit index", -1, context.getCommitIndex());
+        assertEquals("Current term", 0, context.getTermInformation().getCurrentTerm());
+        assertEquals("Voted for", null, context.getTermInformation().getVotedFor());
+
+        TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
+    }
+
+    @Test
+    public void testNonVotingOnRecovery() throws Exception {
+        TEST_LOG.info("testNonVotingOnRecovery starting");
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setElectionTimeoutFactor(1);
+        config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
+
+        String persistenceId = factory.generateActorId("test-actor-");
+        InMemoryJournal.addEntry(persistenceId, 1,  new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+                new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForInitializeBehaviorComplete();
+
+        // Sleep a bit and verify it didn't get an election timeout and schedule an election.
+
+        Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
+        assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
+
+        TEST_LOG.info("testNonVotingOnRecovery ending");
+    }
+
+    @Test
+    public void testLeaderTransitioning() throws Exception {
+        TEST_LOG.info("testLeaderTransitioning starting");
+
+        TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+                Props.create(MessageCollectorActor.class));
+
+        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+        config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+        String persistenceId = factory.generateActorId("test-actor-");
+
+        TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+                config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+        MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+        mockRaftActor.waitForInitializeBehaviorComplete();
+
+        raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
+                0L, -1L, (short)1), ActorRef.noSender());
+        LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+                notifierActor, LeaderStateChanged.class);
+        assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
+
+        MessageCollectorActor.clearMessages(notifierActor);
+
+        raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender());
+
+        leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+        assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
+        assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
+
+        TEST_LOG.info("testLeaderTransitioning ending");
+    }
 }