Further Guava Optional cleanups
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
old mode 100644 (file)
new mode 100755 (executable)
index aea1e9f..dc97336
@@ -6,26 +6,27 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft;
 
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
-import akka.japi.Procedure;
+import akka.actor.Status;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
@@ -33,14 +34,14 @@ import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
-import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -49,16 +50,22 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.Immutable;
 
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
  * in a cluster. It implements the RAFT algorithm as described in the paper
  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
  * In Search of an Understandable Consensus Algorithm</a>
- * <p/>
+ *
+ * <p>
  * RaftActor has 3 states and each state has a certain behavior associated
  * with it. A Raft actor can behave as,
  * <ul>
@@ -66,27 +73,26 @@ import org.slf4j.LoggerFactory;
  * <li> A Follower (or) </li>
  * <li> A Candidate </li>
  * </ul>
- * <p/>
- * <p/>
+ *
+ * <p>
  * A RaftActor MUST be a Leader in order to accept requests from clients to
  * change the state of it's encapsulated state machine. Once a RaftActor becomes
  * a Leader it is also responsible for ensuring that all followers ultimately
  * have the same log and therefore the same state machine as itself.
- * <p/>
- * <p/>
+ *
+ * <p>
  * The current behavior of a RaftActor determines how election for leadership
  * is initiated and how peer RaftActors react to request for votes.
- * <p/>
- * <p/>
+ *
+ * <p>
  * Each RaftActor also needs to know the current election term. It uses this
  * information for a couple of things. One is to simply figure out who it
  * voted for in the last election. Another is to figure out if the message
  * it received to update it's state is stale.
- * <p/>
- * <p/>
+ *
+ * <p>
  * The RaftActor uses akka-persistence to store it's replicated log.
  * Furthermore through it's behaviors a Raft Actor determines
- * <p/>
  * <ul>
  * <li> when a log entry should be persisted </li>
  * <li> when a log entry should be applied to the state machine (and) </li>
@@ -97,17 +103,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    protected final Logger LOG = LoggerFactory.getLogger(getClass());
-
-    /**
-     * The current state determines the current behavior of a RaftActor
-     * A Raft Actor always starts off in the Follower State
-     */
-    private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
-
     /**
      * This context should NOT be passed directly to any other actor it is
-     * only to be consumed by the RaftActorBehaviors
+     * only to be consumed by the RaftActorBehaviors.
      */
     private final RaftActorContextImpl context;
 
@@ -115,22 +113,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final PersistentDataProvider persistentProvider;
 
+    private final BehaviorStateTracker behaviorStateTracker = new BehaviorStateTracker();
+
     private RaftActorRecoverySupport raftRecovery;
 
     private RaftActorSnapshotMessageSupport snapshotSupport;
 
-    private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
-
-    private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier();
-
     private RaftActorServerConfigurationSupport serverConfigurationSupport;
 
-    private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
     private boolean shuttingDown;
 
-    public RaftActor(String id, Map<String, String> peerAddresses,
-         Optional<ConfigParams> configParams, short payloadVersion) {
+    protected RaftActor(final String id, final Map<String, String> peerAddresses,
+         final Optional<ConfigParams> configParams, final short payloadVersion) {
 
         persistentProvider = new PersistentDataProvider(this);
         delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
@@ -138,11 +132,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
-            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
-            delegatingPersistenceProvider, LOG);
+            configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
+            delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf);
 
         context.setPayloadVersion(payloadVersion);
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior));
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
     }
 
     @Override
@@ -158,76 +152,82 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void postStop() {
-        if(currentBehavior.getDelegate() != null) {
-            try {
-                currentBehavior.close();
-            } catch (Exception e) {
-                LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
-            }
-        }
-
+        context.close();
         super.postStop();
     }
 
     @Override
-    public void handleRecover(Object message) {
-        if(raftRecovery == null) {
+    protected void handleRecover(final Object message) {
+        if (raftRecovery == null) {
             raftRecovery = newRaftActorRecoverySupport();
         }
 
         boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
-        if(recoveryComplete) {
+        if (recoveryComplete) {
             onRecoveryComplete();
 
             initializeBehavior();
 
             raftRecovery = null;
-
-            if (context.getReplicatedLog().size() > 0) {
-                self().tell(new InitiateCaptureSnapshot(), self());
-                LOG.info("{}: Snapshot capture initiated after recovery", persistenceId());
-            } else {
-                LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId());
-            }
         }
     }
 
     protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
-        return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort());
+        return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort());
     }
 
-    protected void initializeBehavior(){
+    @VisibleForTesting
+    void initializeBehavior() {
         changeCurrentBehavior(new Follower(context));
     }
 
-    protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
-        reusableBehaviorStateHolder.init(getCurrentBehavior());
+    @VisibleForTesting
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    protected void changeCurrentBehavior(final RaftActorBehavior newBehavior) {
+        final RaftActorBehavior currentBehavior = getCurrentBehavior();
+        if (currentBehavior != null) {
+            try {
+                currentBehavior.close();
+            } catch (Exception e) {
+                LOG.warn("{}: Error closing behavior {}", persistence(), currentBehavior, e);
+            }
+        }
+
+        final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
         setCurrentBehavior(newBehavior);
-        handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
+        handleBehaviorChange(state, newBehavior);
     }
 
+    /**
+     * Method exposed for subclasses to plug-in their logic. This method is invoked by {@link #handleCommand(Object)}
+     * for messages which are not handled by this class. Subclasses overriding this class should fall back to this
+     * implementation for messages which they do not handle
+     *
+     * @param message Incoming command message
+     */
+    protected void handleNonRaftCommand(final Object message) {
+        unhandled(message);
+    }
+
+    /**
+     * Handles a message.
+     *
+     * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
+     * {@link #handleNonRaftCommand(Object)} instead.
+     */
+    @Deprecated
     @Override
-    public void handleCommand(final Object message) {
-        if(serverConfigurationSupport.handleMessage(message, getSender())) {
+    // FIXME: make this method final once our unit tests do not need to override it
+    protected void handleCommand(final Object message) {
+        if (serverConfigurationSupport.handleMessage(message, getSender())) {
+            return;
+        }
+        if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
             return;
-        } else if (message instanceof ApplyState){
+        }
+        if (message instanceof ApplyState) {
             ApplyState applyState = (ApplyState) message;
 
-            long elapsedTime = (System.nanoTime() - applyState.getStartTime());
-            if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
-                LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
-                        TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
-            }
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Applying state for log index {} data {}",
-                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
-                    applyState.getReplicatedLogEntry().getData());
-            }
-
-            applyState(applyState.getClientActor(), applyState.getIdentifier(),
-                applyState.getReplicatedLogEntry().getData());
-
             if (!hasFollowers()) {
                 // for single node, the capture should happen after the apply state
                 // as we delete messages from the persistent journal which have made it to the snapshot
@@ -235,58 +235,122 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 // and recovery shows data missing
                 context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
 
-                context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+                context.getSnapshotManager().trimLog(context.getLastApplied());
             }
 
-        } else if (message instanceof ApplyJournalEntries){
+            possiblyHandleBehaviorMessage(message);
+        } else if (message instanceof ApplyJournalEntries) {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
-            }
+            LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
-            persistence().persist(applyEntries, NoopProcedure.instance());
+            persistence().persistAsync(applyEntries, NoopProcedure.instance());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(getLeaderAddress()),
                 getSelf()
             );
-        } else if(message instanceof GetOnDemandRaftState) {
+        } else if (message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
-        } else if(message instanceof InitiateCaptureSnapshot) {
+        } else if (message instanceof InitiateCaptureSnapshot) {
             captureSnapshot();
-        } else if(message instanceof SwitchBehavior){
-            switchBehavior(((SwitchBehavior) message));
-        } else if(message instanceof LeaderTransitioning) {
-            onLeaderTransitioning();
-        } else if(message instanceof Shutdown) {
+        } else if (message instanceof SwitchBehavior) {
+            switchBehavior((SwitchBehavior) message);
+        } else if (message instanceof LeaderTransitioning) {
+            onLeaderTransitioning((LeaderTransitioning)message);
+        } else if (message instanceof Shutdown) {
             onShutDown();
-        } else if(message instanceof Runnable) {
+        } else if (message instanceof Runnable) {
             ((Runnable)message).run();
-        } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
-            switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
+        } else if (message instanceof NoopPayload) {
+            persistData(null, null, (NoopPayload) message, false);
+        } else if (message instanceof RequestLeadership) {
+            onRequestLeadership((RequestLeadership) message);
+        } else if (!possiblyHandleBehaviorMessage(message)) {
+            handleNonRaftCommand(message);
+        }
+    }
+
+    private void onRequestLeadership(final RequestLeadership message) {
+        LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+        if (!isLeader()) {
+            // non-leader cannot satisfy leadership request
+            LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+                    + " Current behavior: {}. Sending failure response",
+                    persistenceId(), message, getCurrentBehavior().state());
+            message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+                    + message.getRequestedFollowerId()
+                    + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+            return;
+        }
+
+        final String requestedFollowerId = message.getRequestedFollowerId();
+        final ActorRef replyTo = message.getReplyTo();
+        initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+            @Override
+            public void onSuccess(final ActorRef raftActorRef) {
+                // sanity check
+                if (!requestedFollowerId.equals(getLeaderId())) {
+                    onFailure(raftActorRef);
+                }
+
+                LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Success(null), getSelf());
+            }
+
+            @Override
+            public void onFailure(final ActorRef raftActorRef) {
+                LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Failure(
+                        new LeadershipTransferFailedException(
+                                "Failed to transfer leadership to " + requestedFollowerId
+                                        + ". Follower is not ready to become leader")),
+                        getSelf());
+            }
+        }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
+    }
+
+    private boolean possiblyHandleBehaviorMessage(final Object message) {
+        final RaftActorBehavior currentBehavior = getCurrentBehavior();
+        final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+
+        // A behavior indicates that it processed the change by returning a reference to the next behavior
+        // to be used. A null return indicates it has not processed the message and we should be passing it to
+        // the subclass for handling.
+        final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
+        if (nextBehavior != null) {
+            switchBehavior(state, nextBehavior);
+            return true;
         }
+
+        return false;
     }
 
-    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+            final @Nullable String followerId, final long newLeaderTimeoutInMillis) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
-        if(leadershipTransferInProgress == null) {
-            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender());
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
+        if (leadershipTransferInProgress == null) {
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+            leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
-                public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
-                    leadershipTransferInProgress = null;
+                public void onSuccess(final ActorRef raftActorRef) {
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
 
                 @Override
-                public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
-                    leadershipTransferInProgress = null;
+                public void onFailure(final ActorRef raftActorRef) {
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
             });
 
             leadershipTransferInProgress.addOnComplete(onComplete);
+
+            context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
             leadershipTransferInProgress.init();
+
         } else {
             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
             leadershipTransferInProgress.addOnComplete(onComplete);
@@ -296,26 +360,39 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void onShutDown() {
         LOG.debug("{}: onShutDown", persistenceId());
 
-        if(shuttingDown) {
+        if (shuttingDown) {
             return;
         }
 
         shuttingDown = true;
-        if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
+
+        final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+        switch (currentBehavior.state()) {
+            case Leader:
+            case PreLeader:
+                // Fall-through to more work
+                break;
+            default:
+                // For non-leaders shutdown is a no-op
+                self().tell(PoisonPill.getInstance(), self());
+                return;
+        }
+
+        if (context.hasFollowers()) {
             initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
-                public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+                public void onSuccess(final ActorRef raftActorRef) {
                     LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
 
                 @Override
-                public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+                public void onFailure(final ActorRef raftActorRef) {
                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
-            });
-        } else if(currentBehavior.state() == RaftState.Leader) {
+            }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
+        } else {
             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
                 @Override
                 protected void doRun() {
@@ -327,54 +404,54 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     self().tell(PoisonPill.getInstance(), self());
                 }
             });
-        } else {
-            self().tell(PoisonPill.getInstance(), self());
         }
     }
 
-    private void onLeaderTransitioning() {
-        LOG.debug("{}: onLeaderTransitioning", persistenceId());
+    private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+        LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if(currentBehavior.state() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+                && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
-                    currentBehavior.getLeaderPayloadVersion()), getSelf());
+                getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
     }
 
-    private void switchBehavior(SwitchBehavior message) {
-        if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
+    private void switchBehavior(final SwitchBehavior message) {
+        if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
             RaftState newState = message.getNewState();
-            if( newState == RaftState.Leader || newState == RaftState.Follower) {
-                switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
+            if (newState == RaftState.Leader || newState == RaftState.Follower) {
                 getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
+                switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
+                    AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
             } else {
                 LOG.warn("Switching to behavior : {} - not supported", newState);
             }
         }
     }
 
-    private void switchBehavior(Supplier<RaftActorBehavior> supplier){
-        reusableBehaviorStateHolder.init(getCurrentBehavior());
-
-        setCurrentBehavior(supplier.get());
-
-        handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
+    private void switchBehavior(final BehaviorState oldBehaviorState, final RaftActorBehavior nextBehavior) {
+        setCurrentBehavior(nextBehavior);
+        handleBehaviorChange(oldBehaviorState, nextBehavior);
     }
 
-    protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
-        return new RaftActorSnapshotMessageSupport(context, currentBehavior,
-                getRaftActorSnapshotCohort());
+    @VisibleForTesting
+    RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+        return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort());
     }
 
     private void onGetOnDemandRaftStats() {
         // Debugging message to retrieve raft stats.
 
         Map<String, String> peerAddresses = new HashMap<>();
-        for(String peerId: context.getPeerIds()) {
-            peerAddresses.put(peerId, context.getPeerAddress(peerId));
+        Map<String, Boolean> peerVotingStates = new HashMap<>();
+        for (PeerInfo info: context.getPeers()) {
+            peerVotingStates.put(info.getId(), info.isVoting());
+            peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
         }
 
-        OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+        final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+        OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog().dataSize())
@@ -389,23 +466,27 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .snapshotIndex(replicatedLog().getSnapshotIndex())
                 .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
+                .isVoting(context.isVotingMember())
                 .peerAddresses(peerAddresses)
+                .peerVotingStates(peerVotingStates)
                 .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
 
-        ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+        ReplicatedLogEntry lastLogEntry = replicatedLog().last();
         if (lastLogEntry != null) {
             builder.lastLogIndex(lastLogEntry.getIndex());
             builder.lastLogTerm(lastLogEntry.getTerm());
         }
 
-        if(getCurrentBehavior() instanceof AbstractLeader) {
+        if (getCurrentBehavior() instanceof AbstractLeader) {
             AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
             Collection<String> followerIds = leader.getFollowerIds();
             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
-            for(String id: followerIds) {
+            for (String id: followerIds) {
                 final FollowerLogInformation info = leader.getFollower(id);
                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
-                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
+                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(
+                            TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())),
+                        context.getPeerInfo(info.getId()).isVoting()));
             }
 
             builder.followerInfoList(followerInfoList);
@@ -415,40 +496,73 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     }
 
-    private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
+    protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
+        return OnDemandRaftState.builder();
+    }
+
+    private void handleBehaviorChange(final BehaviorState oldBehaviorState, final RaftActorBehavior currentBehavior) {
         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
 
-        if (oldBehavior != currentBehavior){
+        if (oldBehavior != currentBehavior) {
             onStateChanged();
         }
 
+        String lastLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastLeaderId();
         String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId();
         String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
 
         // it can happen that the state has not changed but the leader has changed.
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if(!Objects.equal(lastValidLeaderId, currentBehavior.getLeaderId()) ||
-           oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
-            if(roleChangeNotifier.isPresent()) {
+        if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
+                || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
+            if (roleChangeNotifier.isPresent()) {
                 roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
                         currentBehavior.getLeaderPayloadVersion()), getSelf());
             }
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
 
-            if(leadershipTransferInProgress != null) {
+            RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+                    context.getRaftActorLeadershipTransferCohort();
+            if (leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
+
+            serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
         }
 
-        if (roleChangeNotifier.isPresent() &&
-                (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+        if (roleChangeNotifier.isPresent()
+                && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
                     currentBehavior.state().name()), getSelf());
         }
     }
 
-    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
+    private void handleApplyState(final ApplyState applyState) {
+        long startTime = System.nanoTime();
+
+        Payload payload = applyState.getReplicatedLogEntry().getData();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: Applying state for log index {} data {}",
+                persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+        }
+
+        if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
+            applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
+        }
+
+        long elapsedTime = System.nanoTime() - startTime;
+        if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
+            LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+                    TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+        }
+
+        // Send the ApplyState message back to self to handle further processing asynchronously.
+        self().tell(applyState, self());
+    }
+
+    protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+            final short leaderPayloadVersion) {
         return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
 
@@ -456,57 +570,64 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     public long snapshotSequenceNr() {
         // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
         // so that we can delete the persistent journal based on the saved sequence-number
-        // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot
-        // was saved and not the number we saved.
-        // We would want to override it , by asking akka to use the last-sequence number known to us.
+        // However , when akka replays the journal during recovery, it replays it from the sequence number when the
+        // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the
+        // last-sequence number known to us.
         return context.getSnapshotManager().getLastSequenceNumber();
     }
 
     /**
-     * When a derived RaftActor needs to persist something it must call
-     * persistData.
+     * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+     * {@link #applyState(ActorRef, Identifier, Object)} is notified.
      *
-     * @param clientActor
-     * @param identifier
-     * @param data
+     * @param clientActor optional ActorRef that is provided via the applyState callback
+     * @param identifier the payload identifier
+     * @param data the payload data to persist
+     * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+     *        subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
      */
-    protected void persistData(final ActorRef clientActor, final String identifier,
-        final Payload data) {
-
-        ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+            final boolean batchHint) {
+        ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
+        replicatedLogEntry.setPersistencePending(true);
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
-        }
+        LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
-            @Override
-            public void apply(ReplicatedLogEntry replicatedLogEntry) {
-                if (!hasFollowers()){
-                    // Increment the Commit Index and the Last Applied values
-                    raftContext.setCommitIndex(replicatedLogEntry.getIndex());
-                    raftContext.setLastApplied(replicatedLogEntry.getIndex());
+        boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+            // Clear the persistence pending flag in the log entry.
+            persistedLogEntry.setPersistencePending(false);
 
-                    // Apply the state immediately.
-                    self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
+            if (!hasFollowers()) {
+                // Increment the Commit Index and the Last Applied values
+                raftContext.setCommitIndex(persistedLogEntry.getIndex());
+                raftContext.setLastApplied(persistedLogEntry.getIndex());
 
-                    // Send a ApplyJournalEntries message so that we write the fact that we applied
-                    // the state to durable storage
-                    self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+                // Apply the state immediately.
+                handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
 
-                } else if (clientActor != null) {
-                    context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+                // Send a ApplyJournalEntries message so that we write the fact that we applied
+                // the state to durable storage
+                self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
 
-                    // Send message for replication
-                    currentBehavior.handleMessage(getSelf(),
-                            new Replicate(clientActor, identifier, replicatedLogEntry));
-                }
+            } else {
+                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+
+                // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+                // normally should still be the leader) to check if consensus has now been reached in conjunction with
+                // follower replication.
+                getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
             }
-        });
+        }, true);
+
+        if (wasAppended && hasFollowers()) {
+            // Send log entry for replication.
+            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+                    !batchHint));
+        }
     }
 
     private ReplicatedLog replicatedLog() {
@@ -518,30 +639,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     @VisibleForTesting
-    void setCurrentBehavior(RaftActorBehavior behavior) {
-        currentBehavior.setDelegate(behavior);
+    void setCurrentBehavior(final RaftActorBehavior behavior) {
+        context.setCurrentBehavior(behavior);
     }
 
     protected RaftActorBehavior getCurrentBehavior() {
-        return currentBehavior.getDelegate();
+        return context.getCurrentBehavior();
     }
 
     /**
      * Derived actors can call the isLeader method to check if the current
-     * RaftActor is the Leader or not
+     * RaftActor is the Leader or not.
      *
      * @return true it this RaftActor is a Leader false otherwise
      */
     protected boolean isLeader() {
-        return context.getId().equals(currentBehavior.getLeaderId());
+        return context.getId().equals(getCurrentBehavior().getLeaderId());
     }
 
-    protected boolean isLeaderActive() {
-        return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
-                !isLeadershipTransferInProgress();
+    protected final boolean isLeaderActive() {
+        return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader
+                && !shuttingDown && !isLeadershipTransferInProgress();
     }
 
-    private boolean isLeadershipTransferInProgress() {
+    protected boolean isLeadershipTransferInProgress() {
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
@@ -552,10 +674,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @return A reference to the leader if known, null otherwise
      */
-    protected ActorSelection getLeader(){
+    public ActorSelection getLeader() {
         String leaderAddress = getLeaderAddress();
 
-        if(leaderAddress == null){
+        if (leaderAddress == null) {
             return null;
         }
 
@@ -563,58 +685,47 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
+     * Returns the id of the current leader.
      *
      * @return the current leader's id
      */
-    protected String getLeaderId(){
-        return currentBehavior.getLeaderId();
+    protected final String getLeaderId() {
+        return getCurrentBehavior().getLeaderId();
     }
 
-    protected RaftState getRaftState() {
-        return currentBehavior.state();
-    }
-
-    protected ReplicatedLogEntry getLastLogEntry() {
-        return replicatedLog().last();
+    @VisibleForTesting
+    protected final RaftState getRaftState() {
+        return getCurrentBehavior().state();
     }
 
-    protected Long getCurrentTerm(){
+    protected Long getCurrentTerm() {
         return context.getTermInformation().getCurrentTerm();
     }
 
-    protected Long getCommitIndex(){
-        return context.getCommitIndex();
-    }
-
-    protected Long getLastApplied(){
-        return context.getLastApplied();
-    }
-
     protected RaftActorContext getRaftActorContext() {
         return context;
     }
 
-    protected void updateConfigParams(ConfigParams configParams) {
+    protected void updateConfigParams(final ConfigParams configParams) {
 
         // obtain the RaftPolicy for oldConfigParams and the updated one.
-        String oldRaftPolicy = context.getConfigParams().
-            getCustomRaftPolicyImplementationClass();
-        String newRaftPolicy = configParams.
-            getCustomRaftPolicyImplementationClass();
+        String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
+        String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass();
 
         LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
             oldRaftPolicy, newRaftPolicy);
         context.setConfigParams(configParams);
-        if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) {
+        if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) {
             // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
             // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
             // avoids potential disruption. Otherwise, switch to Follower normally.
-            RaftActorBehavior behavior = currentBehavior.getDelegate();
-            if(behavior instanceof Follower) {
-                String previousLeaderId = ((Follower)behavior).getLeaderId();
+            RaftActorBehavior behavior = getCurrentBehavior();
+            if (behavior != null && behavior.state() == RaftState.Follower) {
+                String previousLeaderId = behavior.getLeaderId();
                 short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
 
-                LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
+                LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(),
+                        previousLeaderId);
 
                 changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
             } else {
@@ -627,16 +738,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return delegatingPersistenceProvider.getDelegate();
     }
 
-    public void setPersistence(DataPersistenceProvider provider) {
+    public void setPersistence(final DataPersistenceProvider provider) {
         delegatingPersistenceProvider.setDelegate(provider);
     }
 
-    protected void setPersistence(boolean persistent) {
-        if(persistent) {
+    protected void setPersistence(final boolean persistent) {
+        DataPersistenceProvider currentPersistence = persistence();
+        if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
             setPersistence(new PersistentDataProvider(this));
-        } else {
-            setPersistence(new NonPersistentDataProvider() {
-                /**
+
+            if (getCurrentBehavior() != null) {
+                LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+                captureSnapshot();
+            }
+        } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
+            setPersistence(new NonPersistentDataProvider(this) {
+                /*
                  * The way snapshotting works is,
                  * <ol>
                  * <li> RaftActor calls createSnapshot on the Shard
@@ -648,7 +765,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                  * </ol>
                  */
                 @Override
-                public void saveSnapshot(Object o) {
+                public void saveSnapshot(final Object object) {
                     // Make saving Snapshot successful
                     // Committing the snapshot here would end up calling commit in the creating state which would
                     // be a state violation. That's why now we send a message to commit the snapshot.
@@ -660,25 +777,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     /**
      * setPeerAddress sets the address of a known peer at a later time.
+     *
      * <p>
      * This is to account for situations where a we know that a peer
      * exists but we do not know an address up-front. This may also be used in
      * situations where a known peer starts off in a different location and we
      * need to change it's address
+     *
      * <p>
      * Note that if the peerId does not match the list of peers passed to
      * this actor during construction an IllegalStateException will be thrown.
-     *
-     * @param peerId
-     * @param peerAddress
      */
-    protected void setPeerAddress(String peerId, String peerAddress){
+    protected void setPeerAddress(final String peerId, final String peerAddress) {
         context.setPeerAddress(peerId, peerAddress);
     }
 
     /**
      * The applyState method will be called by the RaftActor when some data
-     * needs to be applied to the actor's state
+     * needs to be applied to the actor's state.
      *
      * @param clientActor A reference to the client who sent this message. This
      *                    is the same reference that was passed to persistData
@@ -693,14 +809,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param data        A piece of data that was persisted by the persistData call.
      *                    This should NEVER be null.
      */
-    protected abstract void applyState(ActorRef clientActor, String identifier,
-        Object data);
+    protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
 
     /**
      * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
      */
-    @Nonnull
-    protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
+    protected abstract @NonNull RaftActorRecoveryCohort getRaftActorRecoveryCohort();
 
     /**
      * This method is called when recovery is complete.
@@ -708,10 +822,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
+     * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
      */
-    @Nonnull
-    protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
+    protected abstract @NonNull RaftActorSnapshotCohort getRaftActorSnapshotCohort();
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -721,149 +834,207 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onStateChanged();
 
     /**
-     * Notifier Actor for this RaftActor to notify when a role change happens
+     * Notifier Actor for this RaftActor to notify when a role change happens.
+     *
      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
      */
     protected abstract Optional<ActorRef> getRoleChangeNotifier();
 
+    /**
+     * This method is called on the leader when a voting change operation completes.
+     */
+    protected void onVotingStateChangeComplete() {
+    }
+
     /**
      * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
      * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
      * work prior to performing the operation. On completion of any work, the run method must be called on the
      * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
      * this actor's thread dispatcher as as it modifies internal state.
+     *
      * <p>
      * The default implementation immediately runs the operation.
      *
      * @param operation the operation to run
      */
-    protected void pauseLeader(Runnable operation) {
+    protected void pauseLeader(final Runnable operation) {
         operation.run();
     }
 
-    protected void onLeaderChanged(String oldLeader, String newLeader){};
+    /**
+     * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
+     * should resume normal operations.
+     *
+     * <p>
+     * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
+     */
+    protected void unpauseLeader() {
+
+    }
+
+    protected void onLeaderChanged(final String oldLeader, final String newLeader) {
+    }
 
-    private String getLeaderAddress(){
-        if(isLeader()){
+    private String getLeaderAddress() {
+        if (isLeader()) {
             return getSelf().path().toString();
         }
-        String leaderId = currentBehavior.getLeaderId();
+        String leaderId = getLeaderId();
         if (leaderId == null) {
             return null;
         }
         String peerAddress = context.getPeerAddress(leaderId);
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
-                    persistenceId(), leaderId, peerAddress);
-        }
+        LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
 
         return peerAddress;
     }
 
-    protected boolean hasFollowers(){
+    protected boolean hasFollowers() {
         return getRaftActorContext().hasFollowers();
     }
 
     private void captureSnapshot() {
         SnapshotManager snapshotManager = context.getSnapshotManager();
 
-        if(!snapshotManager.isCapturing()) {
+        if (!snapshotManager.isCapturing()) {
+            final long idx = getCurrentBehavior().getReplicatedToAllIndex();
             LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
-                replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
+                replicatedLog().last(), idx);
 
-            snapshotManager.capture(replicatedLog().last(), currentBehavior.getReplicatedToAllIndex());
+            snapshotManager.capture(replicatedLog().last(), idx);
         }
     }
 
     /**
-     * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries}
-     *             whose type for fromIndex is long instead of int. This class was kept for backwards
-     *             compatibility with Helium.
+     * Switch this member to non-voting status. This is a no-op for all behaviors except when we are the leader,
+     * in which case we need to step down.
      */
-    // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
-    @SuppressWarnings("serial")
-    @Deprecated
-    static class DeleteEntries implements Serializable {
-        private final int fromIndex;
+    void becomeNonVoting() {
+        if (isLeader()) {
+            initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+                @Override
+                public void onSuccess(final ActorRef raftActorRef) {
+                    LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
+                    ensureFollowerState();
+                }
 
-        public DeleteEntries(int fromIndex) {
-            this.fromIndex = fromIndex;
-        }
+                @Override
+                public void onFailure(final ActorRef raftActorRef) {
+                    LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
+                    ensureFollowerState();
+                }
 
-        public int getFromIndex() {
-            return fromIndex;
+                private void ensureFollowerState() {
+                    // Whether or not leadership transfer succeeded, we have to step down as leader and
+                    // switch to Follower so ensure that.
+                    if (getRaftState() != RaftState.Follower) {
+                        initializeBehavior();
+                    }
+                }
+            }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
         }
     }
 
     /**
-     * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm}
-     *             which has serialVersionUID set. This class was kept for backwards compatibility with Helium.
+     * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
      */
-    // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
-    @SuppressWarnings("serial")
-    @Deprecated
-    static class UpdateElectionTerm implements Serializable {
-        private final long currentTerm;
-        private final String votedFor;
+    private abstract static class BehaviorState implements Immutable {
+        @Nullable abstract RaftActorBehavior getBehavior();
 
-        public UpdateElectionTerm(long currentTerm, String votedFor) {
-            this.currentTerm = currentTerm;
-            this.votedFor = votedFor;
-        }
+        @Nullable abstract String getLastValidLeaderId();
 
-        public long getCurrentTerm() {
-            return currentTerm;
-        }
+        @Nullable abstract String getLastLeaderId();
 
-        public String getVotedFor() {
-            return votedFor;
-        }
+        abstract short getLeaderPayloadVersion();
     }
 
-    private static class BehaviorStateHolder {
-        private RaftActorBehavior behavior;
-        private String lastValidLeaderId;
-        private short leaderPayloadVersion;
-
-        void init(RaftActorBehavior behavior) {
-            this.behavior = behavior;
-            this.leaderPayloadVersion = behavior != null ? behavior.getLeaderPayloadVersion() : -1;
-
-            String behaviorLeaderId = behavior != null ? behavior.getLeaderId() : null;
-            if(behaviorLeaderId != null) {
-                this.lastValidLeaderId = behaviorLeaderId;
-            }
+    /**
+     * A {@link BehaviorState} corresponding to non-null {@link RaftActorBehavior} state.
+     */
+    private static final class SimpleBehaviorState extends BehaviorState {
+        private final RaftActorBehavior behavior;
+        private final String lastValidLeaderId;
+        private final String lastLeaderId;
+        private final short leaderPayloadVersion;
+
+        SimpleBehaviorState(final String lastValidLeaderId, final String lastLeaderId,
+                final RaftActorBehavior behavior) {
+            this.lastValidLeaderId = lastValidLeaderId;
+            this.lastLeaderId = lastLeaderId;
+            this.behavior = requireNonNull(behavior);
+            this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
         }
 
+        @Override
         RaftActorBehavior getBehavior() {
             return behavior;
         }
 
+        @Override
         String getLastValidLeaderId() {
             return lastValidLeaderId;
         }
 
+        @Override
         short getLeaderPayloadVersion() {
             return leaderPayloadVersion;
         }
+
+        @Override
+        String getLastLeaderId() {
+            return lastLeaderId;
+        }
     }
 
-    private class SwitchBehaviorSupplier implements Supplier<RaftActorBehavior> {
-        private Object message;
-        private ActorRef sender;
+    /**
+     * Class tracking behavior-related information, which we need to keep around and pass across behavior switches.
+     * An instance is created for each RaftActor. It has two functions:
+     * - it keeps track of the last leader ID we have encountered since we have been created
+     * - it creates state capture needed to transition from one behavior to the next
+     */
+    private static final class BehaviorStateTracker {
+        /**
+         * A {@link BehaviorState} corresponding to null {@link RaftActorBehavior} state. Since null behavior is only
+         * allowed before we receive the first message, we know the leader ID to be null.
+         */
+        private static final BehaviorState NULL_BEHAVIOR_STATE = new BehaviorState() {
+            @Override
+            RaftActorBehavior getBehavior() {
+                return null;
+            }
 
-        public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){
-            this.sender = sender;
-            this.message = message;
-            return this;
-        }
+            @Override
+            String getLastValidLeaderId() {
+                return null;
+            }
 
-        @Override
-        public RaftActorBehavior get() {
-            if(this.message instanceof SwitchBehavior){
-                return ((SwitchBehavior) message).getNewState().createBehavior(getRaftActorContext());
+            @Override
+            short getLeaderPayloadVersion() {
+                return -1;
             }
-            return currentBehavior.handleMessage(sender, message);
+
+            @Override
+            String getLastLeaderId() {
+                return null;
+            }
+        };
+
+        private String lastValidLeaderId;
+        private String lastLeaderId;
+
+        BehaviorState capture(final RaftActorBehavior behavior) {
+            if (behavior == null) {
+                verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
+                return NULL_BEHAVIOR_STATE;
+            }
+
+            lastLeaderId = behavior.getLeaderId();
+            if (lastLeaderId != null) {
+                lastValidLeaderId = lastLeaderId;
+            }
+
+            return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
         }
     }
 }