Implement change to voting with no leader 23/32723/12
authorTom Pantelis <tpanteli@brocade.com>
Wed, 13 Jan 2016 21:17:28 +0000 (16:17 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 22 Mar 2016 21:28:05 +0000 (17:28 -0400)
Implemented a special case where on a voting state change from
non-voting to voting, if there's no leader, it will try to elect a
leader in order to apply the change and progress.

This is to handle a use case where one has 2 geographically-separated
3-node clusters, one a primary and the other a backup such that if the
primary cluster is lost, the backup can take over. In this scenario,
there's a logical 6-node cluster where the primary sub-cluster is
configured as voting and the backup sub-cluster as non-voting such
that the primary cluster can  make progress without consensus from
the backup cluster while still replicating to the backup. On fail-over
to the backup, a request would be sent to a member of the backup
cluster to flip the voting states, ie make the backup sub-cluster
voting and the lost primary non-voting. However since the primary
majority cluster is lost, there would be no leader to apply, persist and
replicate the server config change.

Therefore, if the server processing the request is currently non-voting
and is to be changed to voting and there is no current leader, it will
try to elect itself the leader by applying the new server config change in
the RaftActorContext and sending an ElectionTimeout. If it's elected
leader, it persists and replicates the new server config. If no leader
change occurs within the election timeout period, it reverts the server
config change and tries to forward the change request to another server
with the same voting state change. In this manner, the intent is to elect
the newly voting server that has the most up to date log.

Change-Id: I67b5b2d3a97745dbe9a8215f9a28f3a840f2a0db
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/ChangeServersVotingStatus.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java

index 304d58b..a36807a 100644 (file)
@@ -202,6 +202,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
+        if(getCurrentBehavior() != null) {
+            try {
+                getCurrentBehavior().close();
+            } catch(Exception e) {
+                LOG.warn("{}: Error closing behavior {}", persistence(), getCurrentBehavior(), e);
+            }
+        }
+
         reusableBehaviorStateHolder.init(getCurrentBehavior());
         setCurrentBehavior(newBehavior);
         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
@@ -247,7 +255,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof ApplyJournalEntries) {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
+                LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
             }
 
             persistence().persist(applyEntries, NoopProcedure.instance());
@@ -445,6 +453,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             if(leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
+
+            serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
         }
 
         if (roleChangeNotifier.isPresent() &&
index e78f39c..0902081 100644 (file)
@@ -13,14 +13,16 @@ import akka.actor.Cancellable;
 import com.google.common.base.Preconditions;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort.OnComplete;
 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.messages.AddServer;
@@ -35,6 +37,7 @@ import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSn
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Handles server configuration related messages for a RaftActor.
@@ -85,11 +88,34 @@ class RaftActorServerConfigurationSupport {
         }
     }
 
+    void onNewLeader(String leaderId) {
+        currentOperationState.onNewLeader(leaderId);
+    }
+
     private void onChangeServersVotingStatus(ChangeServersVotingStatus message, ActorRef sender) {
         LOG.debug("{}: onChangeServersVotingStatus: {}, state: {}", raftContext.getId(), message,
                 currentOperationState);
 
-        onNewOperation(new ChangeServersVotingStatusContext(message, sender));
+        // The following check is a special case. Normally we fail an operation if there's no leader.
+        // Consider a scenario where one has 2 geographically-separated 3-node clusters, one a primary and
+        // the other a backup such that if the primary cluster is lost, the backup can take over. In this
+        // scenario, we have a logical 6-node cluster where the primary sub-cluster is configured as voting
+        // and the backup sub-cluster as non-voting such that the primary cluster can make progress without
+        // consensus from the backup cluster while still replicating to the backup. On fail-over to the backup,
+        // a request would be sent to a member of the backup cluster to flip the voting states, ie make the
+        // backup sub-cluster voting and the lost primary non-voting. However since the primary majority
+        // cluster is lost, there would be no leader to apply, persist and replicate the server config change.
+        // Therefore, if the local server is currently non-voting and is to be changed to voting and there is
+        // no current leader, we will try to elect a leader using the new server config in order to replicate
+        // the change and progress.
+        boolean localServerChangingToVoting = Boolean.TRUE.equals(message.
+                getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
+        boolean hasNoLeader = raftActor.getLeaderId() == null;
+        if(localServerChangingToVoting && !raftContext.isVotingMember() && hasNoLeader) {
+            currentOperationState.onNewOperation(new ChangeServersVotingStatusContext(message, sender, true));
+        } else {
+            onNewOperation(new ChangeServersVotingStatusContext(message, sender, false));
+        }
     }
 
     private void onRemoveServer(RemoveServer removeServer, ActorRef sender) {
@@ -152,7 +178,7 @@ class RaftActorServerConfigurationSupport {
             ActorSelection leader = raftActor.getLeader();
             if (leader != null) {
                 LOG.debug("{}: Not leader - forwarding to leader {}", raftContext.getId(), leader);
-                leader.forward(operationContext.getOperation(), raftActor.getContext());
+                leader.tell(operationContext.getOperation(), operationContext.getClientRequestor());
             } else {
                 LOG.debug("{}: No leader - returning NO_LEADER reply", raftContext.getId());
                 operationContext.getClientRequestor().tell(operationContext.newReply(
@@ -197,6 +223,9 @@ class RaftActorServerConfigurationSupport {
 
         }
 
+        void onNewLeader(String newLeader) {
+        }
+
         protected void persistNewServerConfiguration(ServerOperationContext<?> operationContext){
             raftContext.setDynamicServerConfigurationInUse();
 
@@ -219,6 +248,10 @@ class RaftActorServerConfigurationSupport {
 
             operationContext.operationComplete(raftActor, replyStatus == null || replyStatus == ServerChangeStatus.OK);
 
+            changeToIdleState();
+        }
+
+        protected void changeToIdleState() {
             currentOperationState = IDLE;
 
             ServerOperationContext<?> nextOperation = pendingOperationsQueue.poll();
@@ -235,8 +268,12 @@ class RaftActorServerConfigurationSupport {
         }
 
         Cancellable newTimer(Object message) {
+            return newTimer(raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), message);
+        }
+
+        Cancellable newTimer(FiniteDuration timeout, Object message) {
             return raftContext.getActorSystem().scheduler().scheduleOnce(
-                    raftContext.getConfigParams().getElectionTimeOutInterval().$times(2), raftContext.getActor(), message,
+                    timeout, raftContext.getActor(), message,
                             raftContext.getActorSystem().dispatcher(), raftContext.getActor());
         }
 
@@ -617,13 +654,17 @@ class RaftActorServerConfigurationSupport {
     }
 
     private static class ChangeServersVotingStatusContext extends ServerOperationContext<ChangeServersVotingStatus> {
-        ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor) {
+        private final boolean tryToElectLeader;
+
+        ChangeServersVotingStatusContext(ChangeServersVotingStatus convertMessage, ActorRef clientRequestor,
+                boolean tryToElectLeader) {
             super(convertMessage, clientRequestor);
+            this.tryToElectLeader = tryToElectLeader;
         }
 
         @Override
         InitialOperationState newInitialOperationState(RaftActorServerConfigurationSupport support) {
-            return support.new ChangeServersVotingStatusState(this);
+            return support.new ChangeServersVotingStatusState(this, tryToElectLeader);
         }
 
         @Override
@@ -638,7 +679,7 @@ class RaftActorServerConfigurationSupport {
             boolean localServerChangedToNonVoting = Boolean.FALSE.equals(getOperation().
                     getServerVotingStatusMap().get(raftActor.getRaftActorContext().getId()));
             if(succeeded && localServerChangedToNonVoting && raftActor.isLeader()) {
-                raftActor.initiateLeadershipTransfer(new OnComplete() {
+                raftActor.initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
                     @Override
                     public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
                         LOG.debug("{}: leader transfer succeeded after change to non-voting", raftActor.persistenceId());
@@ -664,21 +705,55 @@ class RaftActorServerConfigurationSupport {
 
         @Override
         String getLoggingContext() {
-            return getOperation().getServerVotingStatusMap().toString();
+            return getOperation().toString();
         }
     }
 
     private class ChangeServersVotingStatusState extends OperationState implements InitialOperationState {
         private final ChangeServersVotingStatusContext changeVotingStatusContext;
+        private final boolean tryToElectLeader;
 
-        ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext) {
+        ChangeServersVotingStatusState(ChangeServersVotingStatusContext changeVotingStatusContext,
+                boolean tryToElectLeader) {
             this.changeVotingStatusContext = changeVotingStatusContext;
+            this.tryToElectLeader = tryToElectLeader;
         }
 
         @Override
         public void initiate() {
             LOG.debug("Initiating ChangeServersVotingStatusState");
 
+            if(tryToElectLeader) {
+                initiateLocalLeaderElection();
+            } else {
+                updateLocalPeerInfo();
+
+                persistNewServerConfiguration(changeVotingStatusContext);
+            }
+        }
+
+        private void initiateLocalLeaderElection() {
+            LOG.debug("{}: Sending local ElectionTimeout to start leader election", raftContext.getId());
+
+            ServerConfigurationPayload previousServerConfig = raftContext.getPeerServerInfo(true);
+            updateLocalPeerInfo();
+
+            raftContext.getActor().tell(ElectionTimeout.INSTANCE, raftContext.getActor());
+
+            currentOperationState = new WaitingForLeaderElected(changeVotingStatusContext, previousServerConfig);
+        }
+
+        private void updateLocalPeerInfo() {
+            List<ServerInfo> newServerInfoList = newServerInfoList();
+
+            raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
+            if(raftActor.getCurrentBehavior() instanceof AbstractLeader) {
+                AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
+                leader.updateMinReplicaCount();
+            }
+        }
+
+        private List<ServerInfo> newServerInfoList() {
             Map<String, Boolean> serverVotingStatusMap = changeVotingStatusContext.getOperation().getServerVotingStatusMap();
             List<ServerInfo> newServerInfoList = new ArrayList<>();
             for(String peerId: raftContext.getPeerIds()) {
@@ -689,11 +764,90 @@ class RaftActorServerConfigurationSupport {
             newServerInfoList.add(new ServerInfo(raftContext.getId(), serverVotingStatusMap.containsKey(
                     raftContext.getId()) ? serverVotingStatusMap.get(raftContext.getId()) : raftContext.isVotingMember()));
 
-            raftContext.updatePeerIds(new ServerConfigurationPayload(newServerInfoList));
-            AbstractLeader leader = (AbstractLeader) raftActor.getCurrentBehavior();
-            leader.updateMinReplicaCount();
+            return newServerInfoList;
+        }
+    }
 
-            persistNewServerConfiguration(changeVotingStatusContext);
+    private class WaitingForLeaderElected extends OperationState {
+        private final ServerConfigurationPayload previousServerConfig;
+        private final ChangeServersVotingStatusContext operationContext;
+        private final Cancellable timer;
+
+        WaitingForLeaderElected(ChangeServersVotingStatusContext operationContext,
+                ServerConfigurationPayload previousServerConfig) {
+            this.operationContext = operationContext;
+            this.previousServerConfig = previousServerConfig;
+
+            timer = newTimer(raftContext.getConfigParams().getElectionTimeOutInterval(),
+                    new ServerOperationTimeout(operationContext.getLoggingContext()));
+        }
+
+        @Override
+        void onNewLeader(String newLeader) {
+            LOG.debug("{}: New leader {} elected", raftContext.getId(), newLeader);
+
+            timer.cancel();
+
+            if(raftActor.isLeader()) {
+                persistNewServerConfiguration(operationContext);
+            } else {
+                // Edge case - some other node became leader so forward the operation.
+                LOG.debug("{}: Forwarding {} to new leader", raftContext.getId(), operationContext.getOperation());
+
+                // Revert the local server config change.
+                raftContext.updatePeerIds(previousServerConfig);
+
+                changeToIdleState();
+                RaftActorServerConfigurationSupport.this.onNewOperation(operationContext);
+            }
+        }
+
+        @Override
+        void onServerOperationTimeout(ServerOperationTimeout timeout) {
+            LOG.warn("{}: Leader election timed out - cannot apply operation {}",
+                    raftContext.getId(), timeout.getLoggingContext());
+
+            // Revert the local server config change.
+            raftContext.updatePeerIds(previousServerConfig);
+            raftActor.initializeBehavior();
+
+            tryToForwardOperationToAnotherServer();
+        }
+
+        private void tryToForwardOperationToAnotherServer() {
+            Collection<String> serversVisited = new HashSet<>(operationContext.getOperation().getServersVisited());
+
+            LOG.debug("{}: tryToForwardOperationToAnotherServer - servers already visited {}", raftContext.getId(),
+                    serversVisited);
+
+            serversVisited.add(raftContext.getId());
+
+            // Try to find another whose state is being changed from non-voting to voting and that we haven't
+            // tried yet.
+            Map<String, Boolean> serverVotingStatusMap = operationContext.getOperation().getServerVotingStatusMap();
+            ActorSelection forwardToPeerActor = null;
+            for(Map.Entry<String, Boolean> e: serverVotingStatusMap.entrySet()) {
+                Boolean isVoting = e.getValue();
+                String serverId = e.getKey();
+                PeerInfo peerInfo = raftContext.getPeerInfo(serverId);
+                if(isVoting && peerInfo != null && !peerInfo.isVoting() && !serversVisited.contains(serverId)) {
+                    ActorSelection actor = raftContext.getPeerActorSelection(serverId);
+                    if(actor != null) {
+                        forwardToPeerActor = actor;
+                        break;
+                    }
+                }
+            }
+
+            if(forwardToPeerActor != null) {
+                LOG.debug("{}: Found server {} to forward to", raftContext.getId(), forwardToPeerActor);
+
+                forwardToPeerActor.tell(new ChangeServersVotingStatus(serverVotingStatusMap, serversVisited),
+                        operationContext.getClientRequestor());
+                changeToIdleState();
+            } else {
+                operationComplete(operationContext, ServerChangeStatus.NO_LEADER);
+            }
         }
     }
 
index 0f46012..3dc3693 100644 (file)
@@ -8,7 +8,10 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nonnull;
@@ -22,17 +25,31 @@ public class ChangeServersVotingStatus implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final Map<String, Boolean> serverVotingStatusMap;
+    private final Collection<String> serversVisited;
 
     public ChangeServersVotingStatus(@Nonnull Map<String, Boolean> serverVotingStatusMap) {
+        this(serverVotingStatusMap, Collections.emptySet());
+    }
+
+    public ChangeServersVotingStatus(@Nonnull Map<String, Boolean> serverVotingStatusMap,
+            @Nonnull Collection<String> serversVisited) {
         this.serverVotingStatusMap = new HashMap<>(Preconditions.checkNotNull(serverVotingStatusMap));
+        this.serversVisited = ImmutableSet.copyOf(Preconditions.checkNotNull(serversVisited));
     }
 
+    @Nonnull
     public Map<String, Boolean> getServerVotingStatusMap() {
         return serverVotingStatusMap;
     }
 
+    @Nonnull
+    public Collection<String> getServersVisited() {
+        return serversVisited;
+    }
+
     @Override
     public String toString() {
-        return "ChangeServersVotingStatus [serverVotingStatusMap=" + serverVotingStatusMap + "]";
+        return "ChangeServersVotingStatus [serverVotingStatusMap=" + serverVotingStatusMap
+                + (serversVisited != null ? ", serversVisited=" + serversVisited : "") + "]";
     }
 }
index 3bf328b..bdfb2b2 100644 (file)
@@ -25,6 +25,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -32,13 +33,16 @@ import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -50,6 +54,7 @@ import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStat
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
@@ -75,7 +80,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     static final String NEW_SERVER_ID = "new-server";
     static final String NEW_SERVER_ID2 = "new-server2";
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorServerConfigurationSupportTest.class);
-    private static final DataPersistenceProvider NO_PERSISTENCE = new NonPersistentDataProvider();
+    private static final boolean NO_PERSISTENCE = false;
+    private static final boolean PERSISTENT = true;
 
     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
 
@@ -93,7 +99,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
     public void setup() {
         InMemoryJournal.clear();
         InMemorySnapshotStore.clear();
+    }
 
+    private void setupNewFollower() {
         DefaultConfigParamsImpl configParams = newFollowerConfigParams();
 
         newFollowerCollectorActor = actorFactory.createTestActor(
@@ -125,6 +133,8 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
     @Test
     public void testAddServerWithExistingFollower() throws Exception {
+        LOG.info("testAddServerWithExistingFollower starting");
+        setupNewFollower();
         RaftActorContext followerActorContext = newFollowerContext(FOLLOWER_ID, followerActor);
         followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(
                 0, 3, 1).build());
@@ -209,10 +219,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("New follower ReplicatedLogImplEntry getIndex", 3, logEntry.getIndex());
         assertEquals("New follower ReplicatedLogImplEntry getData", ServerConfigurationPayload.class,
                 logEntry.getData().getClass());
+
+        LOG.info("testAddServerWithExistingFollower ending");
     }
 
     @Test
     public void testAddServerWithNoExistingFollower() throws Exception {
+        LOG.info("testAddServerWithNoExistingFollower starting");
+
+        setupNewFollower();
         RaftActorContext initialActorContext = new MockRaftActorContext();
         initialActorContext.setCommitIndex(1);
         initialActorContext.setLastApplied(1);
@@ -261,10 +276,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         // Verify new server config was applied in the new follower
 
         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID), newFollowerActorContext.getPeerIds());
+
+        LOG.info("testAddServerWithNoExistingFollower ending");
     }
 
     @Test
     public void testAddServersAsNonVoting() throws Exception {
+        LOG.info("testAddServersAsNonVoting starting");
+
+        setupNewFollower();
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -326,10 +346,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Leader last applied index", 1, leaderActorContext.getLastApplied());
         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(),
                 votingServer(LEADER_ID), nonVotingServer(NEW_SERVER_ID), nonVotingServer(NEW_SERVER_ID2));
+
+        LOG.info("testAddServersAsNonVoting ending");
     }
 
     @Test
     public void testAddServerWithOperationInProgress() throws Exception {
+        LOG.info("testAddServerWithOperationInProgress starting");
+
+        setupNewFollower();
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -384,10 +409,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         expectMatching(newFollowerCollectorActor, ApplyState.class, 2);
         assertEquals("New follower peers", Sets.newHashSet(LEADER_ID, NEW_SERVER_ID2),
                newFollowerActorContext.getPeerIds());
+
+        LOG.info("testAddServerWithOperationInProgress ending");
     }
 
     @Test
     public void testAddServerWithPriorSnapshotInProgress() throws Exception {
+        LOG.info("testAddServerWithPriorSnapshotInProgress starting");
+
+        setupNewFollower();
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -426,10 +456,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Leader last applied index", 0, leaderActorContext.getLastApplied());
         verifyServerConfigurationPayloadEntry(leaderActorContext.getReplicatedLog(), votingServer(LEADER_ID),
                 votingServer(NEW_SERVER_ID));
+
+        LOG.info("testAddServerWithPriorSnapshotInProgress ending");
     }
 
     @Test
     public void testAddServerWithPriorSnapshotCompleteTimeout() throws Exception {
+        LOG.info("testAddServerWithPriorSnapshotCompleteTimeout starting");
+
+        setupNewFollower();
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -453,10 +488,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("getStatus", ServerChangeStatus.TIMEOUT, addServerReply.getStatus());
 
         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+
+        LOG.info("testAddServerWithPriorSnapshotCompleteTimeout ending");
     }
 
     @Test
     public void testAddServerWithLeaderChangeBeforePriorSnapshotComplete() throws Exception {
+        LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete starting");
+
+        setupNewFollower();
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -497,10 +537,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
         assertEquals("isCapturing", false, leaderActorContext.getSnapshotManager().isCapturing());
+
+        LOG.info("testAddServerWithLeaderChangeBeforePriorSnapshotComplete ending");
     }
 
     @Test
     public void testAddServerWithLeaderChangeDuringInstallSnapshot() throws Exception {
+        LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot starting");
+
+        setupNewFollower();
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -537,10 +582,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
 
         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
+
+        LOG.info("testAddServerWithLeaderChangeDuringInstallSnapshot ending");
     }
 
     @Test
     public void testAddServerWithInstallSnapshotTimeout() throws Exception {
+        LOG.info("testAddServerWithInstallSnapshotTimeout starting");
+
+        setupNewFollower();
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -565,26 +615,37 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Leader peers size", 0, leaderActorContext.getPeerIds().size());
         assertEquals("Leader followers size", 0,
                 ((AbstractLeader)leaderRaftActor.getCurrentBehavior()).getFollowerIds().size());
+
+        LOG.info("testAddServerWithInstallSnapshotTimeout ending");
     }
 
     @Test
     public void testAddServerWithNoLeader() {
+        LOG.info("testAddServerWithNoLeader starting");
+
+        setupNewFollower();
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
-                MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
-                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
+                        followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+                        props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(LEADER_ID));
         noLeaderActor.underlyingActor().waitForInitializeBehaviorComplete();
 
         noLeaderActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, addServerReply.getStatus());
+
+        LOG.info("testAddServerWithNoLeader ending");
     }
 
     @Test
     public void testAddServerWithNoConsensusReached() {
+        LOG.info("testAddServerWithNoConsensusReached starting");
+
+        setupNewFollower();
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -638,10 +699,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         leaderActor.tell(new AddServer(NEW_SERVER_ID2, "", false), testKit.getRef());
         addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.PRIOR_REQUEST_CONSENSUS_TIMEOUT, addServerReply.getStatus());
+
+        LOG.info("testAddServerWithNoConsensusReached ending");
     }
 
     @Test
     public void testAddServerWithExistingServer() {
+        LOG.info("testAddServerWithExistingServer starting");
+
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -653,10 +718,15 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         AddServerReply addServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), AddServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.ALREADY_EXISTS, addServerReply.getStatus());
+
+        LOG.info("testAddServerWithExistingServer ending");
     }
 
     @Test
     public void testAddServerForwardedToLeader() {
+        LOG.info("testAddServerForwardedToLeader starting");
+
+        setupNewFollower();
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
@@ -665,8 +735,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                 actorFactory.generateActorId(LEADER_ID));
 
         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
-                MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
-                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
+                        leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+                        props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(FOLLOWER_ID));
         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
 
@@ -675,15 +746,20 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         followerRaftActor.tell(new AddServer(NEW_SERVER_ID, newFollowerRaftActor.path().toString(), true), testKit.getRef());
         expectFirstMatching(leaderActor, AddServer.class);
+
+        LOG.info("testAddServerForwardedToLeader ending");
     }
 
     @Test
     public void testOnApplyState() {
+        LOG.info("testOnApplyState starting");
+
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
         TestActorRef<MockRaftActor> noLeaderActor = actorFactory.createTestActor(
-                MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
-                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
+                        followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+                        props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(LEADER_ID));
 
         RaftActorServerConfigurationSupport support = new RaftActorServerConfigurationSupport(noLeaderActor.underlyingActor());
@@ -697,26 +773,35 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                 new MockRaftActorContext.MockPayload("1"));
         handled = support.handleMessage(new ApplyState(null, null, nonServerConfigEntry), ActorRef.noSender());
         assertEquals("Message handled", false, handled);
+
+        LOG.info("testOnApplyState ending");
     }
 
     @Test
     public void testRemoveServerWithNoLeader() {
+        LOG.info("testRemoveServerWithNoLeader starting");
+
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
         TestActorRef<MockRaftActor> leaderActor = actorFactory.createTestActor(
-                MockRaftActor.props(LEADER_ID, ImmutableMap.of(FOLLOWER_ID, followerActor.path().toString()),
-                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                MockRaftActor.builder().id(LEADER_ID).peerAddresses(ImmutableMap.of(FOLLOWER_ID,
+                        followerActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+                        props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(LEADER_ID));
         leaderActor.underlyingActor().waitForInitializeBehaviorComplete();
 
         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.NO_LEADER, removeServerReply.getStatus());
+
+        LOG.info("testRemoveServerWithNoLeader ending");
     }
 
     @Test
     public void testRemoveServerNonExistentServer() {
+        LOG.info("testRemoveServerNonExistentServer starting");
+
         RaftActorContext initialActorContext = new MockRaftActorContext();
 
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
@@ -727,10 +812,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         leaderActor.tell(new RemoveServer(NEW_SERVER_ID), testKit.getRef());
         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.DOES_NOT_EXIST, removeServerReply.getStatus());
+
+        LOG.info("testRemoveServerNonExistentServer ending");
     }
 
     @Test
     public void testRemoveServerForwardToLeader() {
+        LOG.info("testRemoveServerForwardToLeader starting");
+
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
@@ -739,8 +828,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                 actorFactory.generateActorId(LEADER_ID));
 
         TestActorRef<MockRaftActor> followerRaftActor = actorFactory.createTestActor(
-                MockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
-                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                MockRaftActor.builder().id(FOLLOWER_ID).peerAddresses(ImmutableMap.of(LEADER_ID,
+                        leaderActor.path().toString())).config(configParams).persistent(Optional.of(false)).
+                        props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId(FOLLOWER_ID));
         followerRaftActor.underlyingActor().waitForInitializeBehaviorComplete();
 
@@ -749,10 +839,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         followerRaftActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
         expectFirstMatching(leaderActor, RemoveServer.class);
+
+        LOG.info("testRemoveServerForwardToLeader ending");
     }
 
     @Test
     public void testRemoveServer() {
+        LOG.info("testRemoveServer starting");
+
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
@@ -768,16 +862,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
 
+        TestActorRef<MessageCollectorActor> collector =
+                actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        actorFactory.generateActorId("collector"));
         TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
-                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        configParams, NO_PERSISTENCE, collector).withDispatcher(Dispatchers.DefaultDispatcherId()),
                 followerActorId);
 
-        TestActorRef<MessageCollectorActor> collector =
-                actorFactory.createTestActor(MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
-
-        followerRaftActor.underlyingActor().setCollectorActor(collector);
-
         leaderActor.tell(new RemoveServer(FOLLOWER_ID), testKit.getRef());
         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.OK, removeServerReply.getStatus());
@@ -791,10 +883,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         assertEquals("Follower ids size", 0, ((Leader)currentBehavior).getFollowerIds().size());
 
         MessageCollectorActor.expectFirstMatching(collector, ServerRemoved.class);
+
+        LOG.info("testRemoveServer ending");
     }
 
     @Test
     public void testRemoveServerLeader() {
+        LOG.info("testRemoveServerLeader starting");
+
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
@@ -810,14 +906,12 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
 
-        TestActorRef<CollectingMockRaftActor> followerRaftActor = actorFactory.createTestActor(
-                CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
-                        configParams, NO_PERSISTENCE).withDispatcher(Dispatchers.DefaultDispatcherId()),
-                followerActorId);
-
         TestActorRef<MessageCollectorActor> followerCollector = actorFactory.createTestActor(MessageCollectorActor.props().
                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("collector"));
-        followerRaftActor.underlyingActor().setCollectorActor(followerCollector);
+        actorFactory.createTestActor(
+                CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString()),
+                        configParams, NO_PERSISTENCE, followerCollector).withDispatcher(Dispatchers.DefaultDispatcherId()),
+                followerActorId);
 
         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
@@ -829,10 +923,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                 votingServer(FOLLOWER_ID));
 
         MessageCollectorActor.expectFirstMatching(leaderCollector, ServerRemoved.class);
+
+        LOG.info("testRemoveServerLeader ending");
     }
 
     @Test
     public void testRemoveServerLeaderWithNoFollowers() {
+        LOG.info("testRemoveServerLeaderWithNoFollowers starting");
+
         TestActorRef<MockLeaderRaftActor> leaderActor = actorFactory.createTestActor(
                 MockLeaderRaftActor.props(Collections.<String, String>emptyMap(),
                         new MockRaftActorContext()).withDispatcher(Dispatchers.DefaultDispatcherId()),
@@ -841,10 +939,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         leaderActor.tell(new RemoveServer(LEADER_ID), testKit.getRef());
         RemoveServerReply removeServerReply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), RemoveServerReply.class);
         assertEquals("getStatus", ServerChangeStatus.NOT_SUPPORTED, removeServerReply.getStatus());
+
+        LOG.info("testRemoveServerLeaderWithNoFollowers ending");
     }
 
     @Test
     public void testChangeServersVotingStatus() {
+        LOG.info("testChangeServersVotingStatus starting");
+
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
         configParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
@@ -860,23 +962,21 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
 
+        TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
-                        FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE).
+                        FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
-        TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
+
+        TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId("collector"));
-        follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector);
-
         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
-                        FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE).
+                        FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
-        TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
-                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                actorFactory.generateActorId("collector"));
-        follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector);
 
         // Send first ChangeServersVotingStatus message
 
@@ -919,10 +1019,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         MessageCollectorActor.expectFirstMatching(follower2Collector, ApplyState.class);
         verifyServerConfigurationPayloadEntry(follower2RaftActor.underlyingActor().getRaftActorContext().getReplicatedLog(),
                 votingServer(LEADER_ID), votingServer(FOLLOWER_ID), nonVotingServer(FOLLOWER_ID2));
+
+        LOG.info("testChangeServersVotingStatus ending");
     }
 
     @Test
     public void testChangeLeaderToNonVoting() {
+        LOG.info("testChangeLeaderToNonVoting starting");
+
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
 
@@ -937,23 +1041,21 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
                         withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId(LEADER_ID));
         TestActorRef<MessageCollectorActor> leaderCollector = newLeaderCollectorActor(leaderActor.underlyingActor());
 
+        TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
         TestActorRef<CollectingMockRaftActor> follower1RaftActor = actorFactory.createTestActor(
                 CollectingMockRaftActor.props(FOLLOWER_ID, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
-                        FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE).
+                        FOLLOWER_ID2, follower2ActorPath), configParams, NO_PERSISTENCE, follower1Collector).
                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower1ActorId);
-        TestActorRef<MessageCollectorActor> follower1Collector = actorFactory.createTestActor(
+
+        TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
                 MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
                 actorFactory.generateActorId("collector"));
-        follower1RaftActor.underlyingActor().setCollectorActor(follower1Collector);
-
         TestActorRef<CollectingMockRaftActor> follower2RaftActor = actorFactory.createTestActor(
                 CollectingMockRaftActor.props(FOLLOWER_ID2, ImmutableMap.of(LEADER_ID, leaderActor.path().toString(),
-                        FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE).
+                        FOLLOWER_ID, follower1ActorPath), configParams, NO_PERSISTENCE, follower2Collector).
                         withDispatcher(Dispatchers.DefaultDispatcherId()), follower2ActorId);
-        TestActorRef<MessageCollectorActor> follower2Collector = actorFactory.createTestActor(
-                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
-                actorFactory.generateActorId("collector"));
-        follower2RaftActor.underlyingActor().setCollectorActor(follower2Collector);
 
         // Send ChangeServersVotingStatus message
 
@@ -977,6 +1079,326 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         verifyRaftState(RaftState.Follower, leaderActor.underlyingActor());
 
         MessageCollectorActor.expectMatching(leaderCollector, AppendEntries.class, 2);
+
+        LOG.info("testChangeLeaderToNonVoting ending");
+    }
+
+    @Test
+    public void testChangeToVotingWithNoLeader() {
+        LOG.info("testChangeToVotingWithNoLeader starting");
+
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+
+        final String node1ID = "node1";
+        final String node2ID = "node2";
+
+        // Set up a persisted ServerConfigurationPayload. Initially node1 and node2 will come up as non-voting.
+        // via the server config. The server config will also contain 2 voting peers that are down (ie no
+        // actors created).
+
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(node1ID, false), new ServerInfo(node2ID, false),
+                new ServerInfo("downNode1", true), new ServerInfo("downNode2", true)));
+        ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+        InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "downNode1"));
+        InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "downNode2"));
+        InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+
+        TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
+        TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
+                        PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+        CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+        TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
+        TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
+                        PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+        CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+        // Wait for snapshot after recovery
+        MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+        // Verify the intended server config was loaded and applied.
+        verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+                nonVotingServer(node1ID), nonVotingServer(node2ID), votingServer("downNode1"),
+                votingServer("downNode2"));
+        assertEquals("isVotingMember", false, node1RaftActor.getRaftActorContext().isVotingMember());
+        assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+        assertEquals("getLeaderId", null, node1RaftActor.getLeaderId());
+
+        MessageCollectorActor.expectFirstMatching(node2Collector, SnapshotComplete.class);
+        assertEquals("isVotingMember", false, node2RaftActor.getRaftActorContext().isVotingMember());
+
+        // For the test, we send a ChangeServersVotingStatus message to node1 to flip the voting states for
+        // each server, ie node1 and node2 to voting and the 2 down nodes to non-voting. This should cause
+        // node1 to try to elect itself as leader in order to apply the new server config. Since the 2
+        // down nodes are switched to non-voting, node1 should only need a vote from node2.
+
+        // First send the message such that node1 has no peer address for node2 - should fail.
+
+        ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
+                node2ID, true, "downNode1", false, "downNode2", false));
+        node1RaftActorRef.tell(changeServers, testKit.getRef());
+        ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
+
+        // Update node2's peer address and send the message again
+
+        node1RaftActor.setPeerAddress(node2ID, node2RaftActorRef.path().toString());
+
+        node1RaftActorRef.tell(changeServers, testKit.getRef());
+        reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
+
+        ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+        assertEquals("getToIndex", 1, apply.getToIndex());
+        verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+                votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
+                nonVotingServer("downNode2"));
+        assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
+        assertEquals("getRaftState", RaftState.Leader, node1RaftActor.getRaftState());
+
+        apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+        assertEquals("getToIndex", 1, apply.getToIndex());
+        verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+                votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
+                nonVotingServer("downNode2"));
+        assertEquals("isVotingMember", true, node2RaftActor.getRaftActorContext().isVotingMember());
+        assertEquals("getRaftState", RaftState.Follower, node2RaftActor.getRaftState());
+
+        LOG.info("testChangeToVotingWithNoLeader ending");
+    }
+
+    @Test
+    public void testChangeToVotingWithNoLeaderAndElectionTimeout() {
+        LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout starting");
+
+        final String node1ID = "node1";
+        final String node2ID = "node2";
+
+        PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
+            @Override
+            public String resolve(String peerId) {
+                return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+                    peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+            }
+        };
+
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
+        ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+        InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
+        InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
+        InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+
+        DefaultConfigParamsImpl configParams1 = new DefaultConfigParamsImpl();
+        configParams1.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        configParams1.setElectionTimeoutFactor(1);
+        configParams1.setPeerAddressResolver(peerAddressResolver);
+        TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
+        TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams1,
+                        PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+        CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+        DefaultConfigParamsImpl configParams2 = new DefaultConfigParamsImpl();
+        configParams2.setElectionTimeoutFactor(1000000);
+        configParams2.setPeerAddressResolver(peerAddressResolver);
+        TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
+        TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams2,
+                        PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+        CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+        // Wait for snapshot after recovery
+        MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+        // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
+        // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
+        // RequestVote messages in node2 which should cause node1 to time out and revert back to the previous
+        // server config and fail with NO_LEADER. Note that node1 shouldn't forward the request to node2 b/c
+        // node2 was previously voting.
+
+        node2RaftActor.setDropMessageOfType(RequestVote.class);
+
+        ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true));
+        node1RaftActorRef.tell(changeServers, testKit.getRef());
+        ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+        assertEquals("getStatus", ServerChangeStatus.NO_LEADER, reply.getStatus());
+
+        assertEquals("Server config", Sets.newHashSet(nonVotingServer(node1ID), votingServer(node2ID)),
+                Sets.newHashSet(node1RaftActor.getRaftActorContext().getPeerServerInfo(true).getServerConfig()));
+        assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+
+        LOG.info("testChangeToVotingWithNoLeaderAndElectionTimeout ending");
+    }
+
+    @Test
+    public void testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout() {
+        LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout starting");
+
+        final String node1ID = "node1";
+        final String node2ID = "node2";
+
+        PeerAddressResolver peerAddressResolver = new PeerAddressResolver() {
+            @Override
+            public String resolve(String peerId) {
+                return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+                    peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+            }
+        };
+
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(3);
+        configParams.setPeerAddressResolver(peerAddressResolver);
+
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(node1ID, false), new ServerInfo(node2ID, false)));
+        ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+        InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
+        InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
+        InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(node2ID, 3, new ReplicatedLogImplEntry(1, 1,
+                new MockRaftActorContext.MockPayload("2")));
+
+        TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
+        TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
+                        PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+        CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+        TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
+        TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
+                        PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+        CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+        // Wait for snapshot after recovery
+        MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+        // Send a ChangeServersVotingStatus message to node1 to change mode1 to voting. This should cause
+        // node1 to try to elect itself as leader in order to apply the new server config. However node1's log
+        // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
+        // forward the request to node2.
+
+        ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
+                ImmutableMap.of(node1ID, true, node2ID, true));
+        node1RaftActorRef.tell(changeServers, testKit.getRef());
+        ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
+
+        MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+        verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+                votingServer(node1ID), votingServer(node2ID));
+        assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
+
+        MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+        verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+                votingServer(node1ID), votingServer(node2ID));
+        assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
+        assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+
+        LOG.info("testChangeToVotingWithNoLeaderAndForwardedToOtherNodeAfterElectionTimeout ending");
+    }
+
+    @Test
+    public void testChangeToVotingWithNoLeaderAndOtherLeaderElected() {
+        LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected starting");
+
+        DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+        configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        configParams.setElectionTimeoutFactor(100000);
+
+        final String node1ID = "node1";
+        final String node2ID = "node2";
+
+        configParams.setPeerAddressResolver(new PeerAddressResolver() {
+            @Override
+            public String resolve(String peerId) {
+                return peerId.equals(node1ID) ? actorFactory.createTestActorPath(node1ID) :
+                    peerId.equals(node2ID) ? actorFactory.createTestActorPath(node2ID) : null;
+            }
+        });
+
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(node1ID, false), new ServerInfo(node2ID, true)));
+        ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, 1, persistedServerConfig);
+
+        InMemoryJournal.addEntry(node1ID, 1, new UpdateElectionTerm(1, "node1"));
+        InMemoryJournal.addEntry(node1ID, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(node2ID, 1, new UpdateElectionTerm(1, "node1"));
+        InMemoryJournal.addEntry(node2ID, 2, persistedServerConfigEntry);
+
+        TestActorRef<MessageCollectorActor> node1Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
+        TestActorRef<CollectingMockRaftActor> node1RaftActorRef = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(node1ID, ImmutableMap.<String, String>of(), configParams,
+                        PERSISTENT, node1Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node1ID);
+        CollectingMockRaftActor node1RaftActor = node1RaftActorRef.underlyingActor();
+
+        TestActorRef<MessageCollectorActor> node2Collector = actorFactory.createTestActor(
+                MessageCollectorActor.props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                actorFactory.generateActorId("collector"));
+        TestActorRef<CollectingMockRaftActor> node2RaftActorRef = actorFactory.createTestActor(
+                CollectingMockRaftActor.props(node2ID, ImmutableMap.<String, String>of(), configParams,
+                        PERSISTENT, node2Collector).withDispatcher(Dispatchers.DefaultDispatcherId()), node2ID);
+        CollectingMockRaftActor node2RaftActor = node2RaftActorRef.underlyingActor();
+
+        // Wait for snapshot after recovery
+        MessageCollectorActor.expectFirstMatching(node1Collector, SnapshotComplete.class);
+
+        // Send a ChangeServersVotingStatus message to node1 to change node1 to voting. This should cause
+        // node1 to try to elect itself as leader in order to apply the new server config. But we'll drop
+        // RequestVote messages in node2 and make it the leader so node1 should forward the server change
+        // request to node2 when node2 is elected.
+
+        node2RaftActor.setDropMessageOfType(RequestVote.class);
+
+        ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(ImmutableMap.of(node1ID, true,
+                node2ID, true));
+        node1RaftActorRef.tell(changeServers, testKit.getRef());
+
+        MessageCollectorActor.expectFirstMatching(node2Collector, RequestVote.class);
+
+        node2RaftActorRef.tell(ElectionTimeout.INSTANCE, ActorRef.noSender());
+
+        ServerChangeReply reply = testKit.expectMsgClass(JavaTestKit.duration("5 seconds"), ServerChangeReply.class);
+        assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
+
+        MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+        verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
+                votingServer(node1ID), votingServer(node2ID));
+        assertEquals("isVotingMember", true, node1RaftActor.getRaftActorContext().isVotingMember());
+        assertEquals("getRaftState", RaftState.Follower, node1RaftActor.getRaftState());
+
+        MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+        verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
+                votingServer(node1ID), votingServer(node2ID));
+        assertEquals("getRaftState", RaftState.Leader, node2RaftActor.getRaftState());
+
+        LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
     }
 
     private void verifyRaftState(RaftState expState, RaftActor... raftActors) {
@@ -1016,17 +1438,18 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         ReplicatedLogEntry logEntry = log.get(log.lastIndex());
         assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
         ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
-        assertEquals("getNewServerConfig", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
+        assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
     }
 
     private static RaftActorContext newFollowerContext(String id, TestActorRef<? extends UntypedActor> actor) {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
         configParams.setElectionTimeoutFactor(100000);
-        ElectionTermImpl termInfo = new ElectionTermImpl(NO_PERSISTENCE, id, LOG);
+        NonPersistentDataProvider noPersistence = new NonPersistentDataProvider();
+        ElectionTermImpl termInfo = new ElectionTermImpl(noPersistence, id, LOG);
         termInfo.update(1, LEADER_ID);
         return new RaftActorContextImpl(actor, actor.underlyingActor().getContext(),
-                id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, NO_PERSISTENCE, LOG);
+                id, termInfo, -1, -1, ImmutableMap.of(LEADER_ID, ""), configParams, noPersistence, LOG);
     }
 
     static abstract class AbstractMockRaftActor extends MockRaftActor {
@@ -1034,9 +1457,9 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         private volatile Class<?> dropMessageOfType;
 
         AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
-                DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
+                boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
             super(builder().id(id).peerAddresses(peerAddresses).config(config.get()).
-                    dataPersistenceProvider(dataPersistenceProvider));
+                    persistent(Optional.of(persistent)));
             this.collectorActor = collectorActor;
         }
 
@@ -1062,14 +1485,26 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
     public static class CollectingMockRaftActor extends AbstractMockRaftActor {
 
-        CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider, TestActorRef<MessageCollectorActor> collectorActor) {
-            super(id, peerAddresses, config, dataPersistenceProvider, collectorActor);
+        CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
+                boolean persistent, TestActorRef<MessageCollectorActor> collectorActor) {
+            super(id, peerAddresses, config, persistent, collectorActor);
+            snapshotCohortDelegate = new RaftActorSnapshotCohort() {
+                @Override
+                public void createSnapshot(ActorRef actorRef) {
+                    actorRef.tell(new CaptureSnapshotReply(new byte[0]), actorRef);
+                }
+
+                @Override
+                public void applySnapshot(byte[] snapshotBytes) {
+                }
+            };
         }
 
         public static Props props(final String id, final Map<String, String> peerAddresses,
-                                  ConfigParams config, DataPersistenceProvider dataPersistenceProvider){
+                ConfigParams config, boolean persistent, TestActorRef<MessageCollectorActor> collectorActor){
 
-            return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config), dataPersistenceProvider, null);
+            return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
+                    persistent, collectorActor);
         }
 
     }
@@ -1118,7 +1553,7 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
 
     public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
         public MockNewFollowerRaftActor(ConfigParams config, TestActorRef<MessageCollectorActor> collectorActor) {
-            super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), null, collectorActor);
+            super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE, collectorActor);
             setPersistence(false);
         }