BUG-8618: introduce RaftActor.unpauseLeader()
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index eaa005e3efdf6911a7beadce66e427d68a89161e..6d01752a95e17b4c45b4e62f7b96fc85ee730f3c 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -10,45 +11,61 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
-import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
+import akka.actor.PoisonPill;
+import akka.actor.Status;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.protobuf.ByteString;
-import java.io.Serializable;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
+import org.opendaylight.controller.cluster.NonPersistentDataProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.Immutable;
 
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
  * in a cluster. It implements the RAFT algorithm as described in the paper
  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
  * In Search of an Understandable Consensus Algorithm</a>
- * <p/>
+ *
+ * <p>
  * RaftActor has 3 states and each state has a certain behavior associated
  * with it. A Raft actor can behave as,
  * <ul>
@@ -56,27 +73,26 @@ import org.slf4j.LoggerFactory;
  * <li> A Follower (or) </li>
  * <li> A Candidate </li>
  * </ul>
- * <p/>
- * <p/>
+ *
+ * <p>
  * A RaftActor MUST be a Leader in order to accept requests from clients to
  * change the state of it's encapsulated state machine. Once a RaftActor becomes
  * a Leader it is also responsible for ensuring that all followers ultimately
  * have the same log and therefore the same state machine as itself.
- * <p/>
- * <p/>
+ *
+ * <p>
  * The current behavior of a RaftActor determines how election for leadership
  * is initiated and how peer RaftActors react to request for votes.
- * <p/>
- * <p/>
+ *
+ * <p>
  * Each RaftActor also needs to know the current election term. It uses this
  * information for a couple of things. One is to simply figure out who it
  * voted for in the last election. Another is to figure out if the message
  * it received to update it's state is stale.
- * <p/>
- * <p/>
+ *
+ * <p>
  * The RaftActor uses akka-persistence to store it's replicated log.
  * Furthermore through it's behaviors a Raft Actor determines
- * <p/>
  * <ul>
  * <li> when a log entry should be persisted </li>
  * <li> when a log entry should be applied to the state machine (and) </li>
@@ -87,56 +103,40 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    private static final Procedure<ApplyJournalEntries> APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK =
-            new Procedure<ApplyJournalEntries>() {
-                @Override
-                public void apply(ApplyJournalEntries param) throws Exception {
-                }
-            };
-
-    protected final Logger LOG = LoggerFactory.getLogger(getClass());
-
-    /**
-     * The current state determines the current behavior of a RaftActor
-     * A Raft Actor always starts off in the Follower State
-     */
-    private RaftActorBehavior currentBehavior;
-
     /**
      * This context should NOT be passed directly to any other actor it is
-     * only to be consumed by the RaftActorBehaviors
+     * only to be consumed by the RaftActorBehaviors.
      */
     private final RaftActorContextImpl context;
 
-    /**
-     * The in-memory journal
-     */
-    private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
+    private final DelegatingPersistentDataProvider delegatingPersistenceProvider;
 
-    private CaptureSnapshot captureSnapshot = null;
+    private final PersistentDataProvider persistentProvider;
 
-    private Stopwatch recoveryTimer;
+    private final BehaviorStateTracker behaviorStateTracker = new BehaviorStateTracker();
 
-    private int currentRecoveryBatchCount;
+    private RaftActorRecoverySupport raftRecovery;
 
-    public RaftActor(String id, Map<String, String> peerAddresses) {
-        this(id, peerAddresses, Optional.<ConfigParams>absent());
-    }
+    private RaftActorSnapshotMessageSupport snapshotSupport;
 
-    public RaftActor(String id, Map<String, String> peerAddresses,
-         Optional<ConfigParams> configParams) {
+    private RaftActorServerConfigurationSupport serverConfigurationSupport;
+
+    private boolean shuttingDown;
+
+    protected RaftActor(String id, Map<String, String> peerAddresses,
+         Optional<ConfigParams> configParams, short payloadVersion) {
+
+        persistentProvider = new PersistentDataProvider(this);
+        delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider);
 
         context = new RaftActorContextImpl(this.getSelf(),
-            this.getContext(), id, new ElectionTermImpl(),
-            -1, -1, replicatedLog, peerAddresses,
-            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
-            LOG);
-    }
+            this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
+            -1, -1, peerAddresses,
+            configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
+            delegatingPersistenceProvider, this::handleApplyState, LOG);
 
-    private void initRecoveryTimer() {
-        if(recoveryTimer == null) {
-            recoveryTimer = Stopwatch.createStarted();
-        }
+        context.setPayloadVersion(payloadVersion);
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
     }
 
     @Override
@@ -145,342 +145,524 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getConfigParams().getJournalRecoveryLogBatchSize());
 
         super.preStart();
+
+        snapshotSupport = newRaftActorSnapshotMessageSupport();
+        serverConfigurationSupport = new RaftActorServerConfigurationSupport(this);
     }
 
     @Override
     public void postStop() {
-        if(currentBehavior != null) {
-            try {
-                currentBehavior.close();
-            } catch (Exception e) {
-                LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
-            }
-        }
-
+        context.close();
         super.postStop();
     }
 
     @Override
-    public void handleRecover(Object message) {
-        if(persistence().isRecoveryApplicable()) {
-            if (message instanceof SnapshotOffer) {
-                onRecoveredSnapshot((SnapshotOffer) message);
-            } else if (message instanceof ReplicatedLogEntry) {
-                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
-            } else if (message instanceof ApplyLogEntries) {
-                // Handle this message for backwards compatibility with pre-Lithium versions.
-                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
-            } else if (message instanceof ApplyJournalEntries) {
-                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
-            } else if (message instanceof DeleteEntries) {
-                replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
-            } else if (message instanceof UpdateElectionTerm) {
-                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                        ((UpdateElectionTerm) message).getVotedFor());
-            } else if (message instanceof RecoveryCompleted) {
-                onRecoveryCompletedMessage();
-            }
-        } else {
-            if (message instanceof RecoveryCompleted) {
-                // Delete all the messages from the akka journal so that we do not end up with consistency issues
-                // Note I am not using the dataPersistenceProvider and directly using the akka api here
-                deleteMessages(lastSequenceNr());
+    protected void handleRecover(Object message) {
+        if (raftRecovery == null) {
+            raftRecovery = newRaftActorRecoverySupport();
+        }
 
-                // Delete all the akka snapshots as they will not be needed
-                deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
+        boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
+        if (recoveryComplete) {
+            onRecoveryComplete();
 
-                onRecoveryComplete();
+            initializeBehavior();
 
-                initializeBehavior();
-            }
+            raftRecovery = null;
         }
     }
 
-    private void onRecoveredSnapshot(SnapshotOffer offer) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: SnapshotOffer called..", persistenceId());
+    protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
+        return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort());
+    }
+
+    @VisibleForTesting
+    void initializeBehavior() {
+        changeCurrentBehavior(new Follower(context));
+    }
+
+    @VisibleForTesting
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    protected void changeCurrentBehavior(RaftActorBehavior newBehavior) {
+        final RaftActorBehavior currentBehavior = getCurrentBehavior();
+        if (currentBehavior != null) {
+            try {
+                currentBehavior.close();
+            } catch (Exception e) {
+                LOG.warn("{}: Error closing behavior {}", persistence(), currentBehavior, e);
+            }
         }
 
-        initRecoveryTimer();
+        final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+        setCurrentBehavior(newBehavior);
+        handleBehaviorChange(state, newBehavior);
+    }
 
-        Snapshot snapshot = (Snapshot) offer.snapshot();
+    /**
+     * Method exposed for subclasses to plug-in their logic. This method is invoked by {@link #handleCommand(Object)}
+     * for messages which are not handled by this class. Subclasses overriding this class should fall back to this
+     * implementation for messages which they do not handle
+     *
+     * @param message Incoming command message
+     */
+    protected void handleNonRaftCommand(final Object message) {
+        unhandled(message);
+    }
 
-        // Create a replicated log with the snapshot information
-        // The replicated log can be used later on to retrieve this snapshot
-        // when we need to install it on a peer
-        replicatedLog = new ReplicatedLogImpl(snapshot);
+    /**
+     * Handles a message.
+     *
+     * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
+     * {@link #handleNonRaftCommand(Object)} instead.
+     */
+    @Deprecated
+    @Override
+    // FIXME: make this method final once our unit tests do not need to override it
+    protected void handleCommand(final Object message) {
+        if (serverConfigurationSupport.handleMessage(message, getSender())) {
+            return;
+        }
+        if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
+            return;
+        }
+        if (message instanceof ApplyState) {
+            ApplyState applyState = (ApplyState) message;
 
-        context.setReplicatedLog(replicatedLog);
-        context.setLastApplied(snapshot.getLastAppliedIndex());
-        context.setCommitIndex(snapshot.getLastAppliedIndex());
+            if (!hasFollowers()) {
+                // for single node, the capture should happen after the apply state
+                // as we delete messages from the persistent journal which have made it to the snapshot
+                // capturing the snapshot before applying makes the persistent journal and snapshot out of sync
+                // and recovery shows data missing
+                context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
 
-        Stopwatch timer = Stopwatch.createStarted();
+                context.getSnapshotManager().trimLog(context.getLastApplied());
+            }
 
-        // Apply the snapshot to the actors state
-        applyRecoverySnapshot(snapshot.getState());
+            possiblyHandleBehaviorMessage(message);
+        } else if (message instanceof ApplyJournalEntries) {
+            ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
+            LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
-        timer.stop();
-        LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
-                replicatedLog.size(), persistenceId(), timer.toString(),
-                replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
-    }
+            persistence().persistAsync(applyEntries, NoopProcedure.instance());
 
-    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
+        } else if (message instanceof FindLeader) {
+            getSender().tell(
+                new FindLeaderReply(getLeaderAddress()),
+                getSelf()
+            );
+        } else if (message instanceof GetOnDemandRaftState) {
+            onGetOnDemandRaftStats();
+        } else if (message instanceof InitiateCaptureSnapshot) {
+            captureSnapshot();
+        } else if (message instanceof SwitchBehavior) {
+            switchBehavior((SwitchBehavior) message);
+        } else if (message instanceof LeaderTransitioning) {
+            onLeaderTransitioning((LeaderTransitioning)message);
+        } else if (message instanceof Shutdown) {
+            onShutDown();
+        } else if (message instanceof Runnable) {
+            ((Runnable)message).run();
+        } else if (message instanceof NoopPayload) {
+            persistData(null, null, (NoopPayload) message, false);
+        } else if (message instanceof RequestLeadership) {
+            onRequestLeadership((RequestLeadership) message);
+        } else if (!possiblyHandleBehaviorMessage(message)) {
+            handleNonRaftCommand(message);
         }
-
-        replicatedLog.append(logEntry);
     }
 
-    private void onRecoveredApplyLogEntries(long toIndex) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    persistenceId(), context.getLastApplied() + 1, toIndex);
+    private void onRequestLeadership(final RequestLeadership message) {
+        LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+        if (!isLeader()) {
+            // non-leader cannot satisfy leadership request
+            LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+                    + " Current behavior: {}. Sending failure response",
+                    persistenceId(), getCurrentBehavior().state());
+            message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+                    + message.getRequestedFollowerId()
+                    + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+            return;
         }
 
-        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog.get(i));
+        final String requestedFollowerId = message.getRequestedFollowerId();
+        final ActorRef replyTo = message.getReplyTo();
+        initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+            @Override
+            public void onSuccess(final ActorRef raftActorRef) {
+                // sanity check
+                if (!requestedFollowerId.equals(getLeaderId())) {
+                    onFailure(raftActorRef);
+                }
+
+                LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Success(null), getSelf());
+            }
+
+            @Override
+            public void onFailure(final ActorRef raftActorRef) {
+                LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Failure(
+                        new LeadershipTransferFailedException(
+                                "Failed to transfer leadership to " + requestedFollowerId
+                                        + ". Follower is not ready to become leader")),
+                        getSelf());
+            }
+        }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
+    }
+
+    private boolean possiblyHandleBehaviorMessage(final Object message) {
+        final RaftActorBehavior currentBehavior = getCurrentBehavior();
+        final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+
+        // A behavior indicates that it processed the change by returning a reference to the next behavior
+        // to be used. A null return indicates it has not processed the message and we should be passing it to
+        // the subclass for handling.
+        final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
+        if (nextBehavior != null) {
+            switchBehavior(state, nextBehavior);
+            return true;
         }
 
-        context.setLastApplied(toIndex);
-        context.setCommitIndex(toIndex);
+        return false;
     }
 
-    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
-        initRecoveryTimer();
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+            @Nullable final String followerId, long newLeaderTimeoutInMillis) {
+        LOG.debug("{}: Initiating leader transfer", persistenceId());
 
-        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
-        if(currentRecoveryBatchCount == 0) {
-            startLogRecoveryBatch(batchSize);
-        }
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
+        if (leadershipTransferInProgress == null) {
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+            leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
+            leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
+                @Override
+                public void onSuccess(ActorRef raftActorRef) {
+                    context.setRaftActorLeadershipTransferCohort(null);
+                }
+
+                @Override
+                public void onFailure(ActorRef raftActorRef) {
+                    context.setRaftActorLeadershipTransferCohort(null);
+                }
+            });
 
-        appendRecoveredLogEntry(logEntry.getData());
+            leadershipTransferInProgress.addOnComplete(onComplete);
 
-        if(++currentRecoveryBatchCount >= batchSize) {
-            endCurrentLogRecoveryBatch();
+            context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
+            leadershipTransferInProgress.init();
+
+        } else {
+            LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
+            leadershipTransferInProgress.addOnComplete(onComplete);
         }
     }
 
-    private void endCurrentLogRecoveryBatch() {
-        applyCurrentLogRecoveryBatch();
-        currentRecoveryBatchCount = 0;
-    }
+    private void onShutDown() {
+        LOG.debug("{}: onShutDown", persistenceId());
 
-    private void onRecoveryCompletedMessage() {
-        if(currentRecoveryBatchCount > 0) {
-            endCurrentLogRecoveryBatch();
+        if (shuttingDown) {
+            return;
         }
 
-        onRecoveryComplete();
-
-        String recoveryTime = "";
-        if(recoveryTimer != null) {
-            recoveryTimer.stop();
-            recoveryTime = " in " + recoveryTimer.toString();
-            recoveryTimer = null;
+        shuttingDown = true;
+
+        final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+        switch (currentBehavior.state()) {
+            case Leader:
+            case PreLeader:
+                // Fall-through to more work
+                break;
+            default:
+                // For non-leaders shutdown is a no-op
+                self().tell(PoisonPill.getInstance(), self());
+                return;
         }
 
-        LOG.info(
-            "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
-                "Persistence Id =  " + persistenceId() +
-                " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
-                "journal-size={}",
-            replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
-            replicatedLog.getSnapshotTerm(), replicatedLog.size());
+        if (context.hasFollowers()) {
+            initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+                @Override
+                public void onSuccess(ActorRef raftActorRef) {
+                    LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
+                    raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+                }
 
-        initializeBehavior();
-    }
+                @Override
+                public void onFailure(ActorRef raftActorRef) {
+                    LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
+                    raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+                }
+            }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
+        } else {
+            pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
+                @Override
+                protected void doRun() {
+                    self().tell(PoisonPill.getInstance(), self());
+                }
 
-    protected void initializeBehavior(){
-        changeCurrentBehavior(new Follower(context));
+                @Override
+                protected void doCancel() {
+                    self().tell(PoisonPill.getInstance(), self());
+                }
+            });
+        }
     }
 
-    protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
-        RaftActorBehavior oldBehavior = currentBehavior;
-        currentBehavior = newBehavior;
-        handleBehaviorChange(oldBehavior, currentBehavior);
+    private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+        LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
+        Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+                && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
+            roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
+                getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
+        }
     }
 
-    @Override public void handleCommand(Object message) {
-        if (message instanceof ApplyState){
-            ApplyState applyState = (ApplyState) message;
-
-            long elapsedTime = (System.nanoTime() - applyState.getStartTime());
-            if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
-                LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
-                        TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+    private void switchBehavior(SwitchBehavior message) {
+        if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
+            RaftState newState = message.getNewState();
+            if (newState == RaftState.Leader || newState == RaftState.Follower) {
+                switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
+                    AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
+                getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
+            } else {
+                LOG.warn("Switching to behavior : {} - not supported", newState);
             }
+        }
+    }
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Applying state for log index {} data {}",
-                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
-                    applyState.getReplicatedLogEntry().getData());
-            }
+    private void switchBehavior(final BehaviorState oldBehaviorState, final RaftActorBehavior nextBehavior) {
+        setCurrentBehavior(nextBehavior);
+        handleBehaviorChange(oldBehaviorState, nextBehavior);
+    }
 
-            applyState(applyState.getClientActor(), applyState.getIdentifier(),
-                applyState.getReplicatedLogEntry().getData());
+    @VisibleForTesting
+    RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+        return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort());
+    }
 
-        } else if (message instanceof ApplyJournalEntries){
-            ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
-            }
+    private void onGetOnDemandRaftStats() {
+        // Debugging message to retrieve raft stats.
 
-            persistence().persist(applyEntries, APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK);
+        Map<String, String> peerAddresses = new HashMap<>();
+        Map<String, Boolean> peerVotingStates = new HashMap<>();
+        for (PeerInfo info: context.getPeers()) {
+            peerVotingStates.put(info.getId(), info.isVoting());
+            peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
+        }
 
-        } else if(message instanceof ApplySnapshot ) {
-            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
+        final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+        OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
+                .commitIndex(context.getCommitIndex())
+                .currentTerm(context.getTermInformation().getCurrentTerm())
+                .inMemoryJournalDataSize(replicatedLog().dataSize())
+                .inMemoryJournalLogSize(replicatedLog().size())
+                .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
+                .lastApplied(context.getLastApplied())
+                .lastIndex(replicatedLog().lastIndex())
+                .lastTerm(replicatedLog().lastTerm())
+                .leader(getLeaderId())
+                .raftState(currentBehavior.state().toString())
+                .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
+                .snapshotIndex(replicatedLog().getSnapshotIndex())
+                .snapshotTerm(replicatedLog().getSnapshotTerm())
+                .votedFor(context.getTermInformation().getVotedFor())
+                .isVoting(context.isVotingMember())
+                .peerAddresses(peerAddresses)
+                .peerVotingStates(peerVotingStates)
+                .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
+
+        ReplicatedLogEntry lastLogEntry = replicatedLog().last();
+        if (lastLogEntry != null) {
+            builder.lastLogIndex(lastLogEntry.getIndex());
+            builder.lastLogTerm(lastLogEntry.getTerm());
+        }
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
-                    snapshot.getLastAppliedTerm()
-                );
+        if (getCurrentBehavior() instanceof AbstractLeader) {
+            AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
+            Collection<String> followerIds = leader.getFollowerIds();
+            List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
+            for (String id: followerIds) {
+                final FollowerLogInformation info = leader.getFollower(id);
+                followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
+                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
+                        context.getPeerInfo(info.getId()).isVoting()));
             }
 
-            applySnapshot(snapshot.getState());
+            builder.followerInfoList(followerInfoList);
+        }
 
-            //clears the followers log, sets the snapshot index to ensure adjusted-index works
-            replicatedLog = new ReplicatedLogImpl(snapshot);
-            context.setReplicatedLog(replicatedLog);
-            context.setLastApplied(snapshot.getLastAppliedIndex());
+        sender().tell(builder.build(), self());
 
-        } else if (message instanceof FindLeader) {
-            getSender().tell(
-                new FindLeaderReply(getLeaderAddress()),
-                getSelf()
-            );
+    }
 
-        } else if (message instanceof SaveSnapshotSuccess) {
-            SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
+    protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+        return OnDemandRaftState.builder();
+    }
 
-            long sequenceNumber = success.metadata().sequenceNr();
+    private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
+        RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
 
-            commitSnapshot(sequenceNumber);
+        if (oldBehavior != currentBehavior) {
+            onStateChanged();
+        }
 
-        } else if (message instanceof SaveSnapshotFailure) {
-            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
+        String lastLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastLeaderId();
+        String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId();
+        String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
 
-            LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
-                    persistenceId(), saveSnapshotFailure.cause());
+        // it can happen that the state has not changed but the leader has changed.
+        Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+        if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
+                || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
+            if (roleChangeNotifier.isPresent()) {
+                roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
+                        currentBehavior.getLeaderPayloadVersion()), getSelf());
+            }
 
-            context.getReplicatedLog().snapshotRollback();
+            onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
 
-            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
-                context.getReplicatedLog().getSnapshotIndex(),
-                context.getReplicatedLog().getSnapshotTerm(),
-                context.getReplicatedLog().size());
+            RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+                    context.getRaftActorLeadershipTransferCohort();
+            if (leadershipTransferInProgress != null) {
+                leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
+            }
 
-        } else if (message instanceof CaptureSnapshot) {
-            LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
+            serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
+        }
 
-            if(captureSnapshot == null) {
-                captureSnapshot = (CaptureSnapshot)message;
-                createSnapshot();
-            }
+        if (roleChangeNotifier.isPresent()
+                && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
+            roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
+                    currentBehavior.state().name()), getSelf());
+        }
+    }
 
-        } else if (message instanceof CaptureSnapshotReply){
-            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
+    private void handleApplyState(ApplyState applyState) {
+        long startTime = System.nanoTime();
 
-        } else {
-            RaftActorBehavior oldBehavior = currentBehavior;
-            currentBehavior = currentBehavior.handleMessage(getSender(), message);
+        Payload payload = applyState.getReplicatedLogEntry().getData();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: Applying state for log index {} data {}",
+                persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+        }
 
-            handleBehaviorChange(oldBehavior, currentBehavior);
+        if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
+            applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
         }
-    }
 
-    private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
-        if (oldBehavior != currentBehavior){
-            onStateChanged();
+        long elapsedTime = System.nanoTime() - startTime;
+        if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
+            LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+                    TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
         }
 
-        String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
-        String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
+        // Send the ApplyState message back to self to handle further processing asynchronously.
+        self().tell(applyState, self());
+    }
 
-        // it can happen that the state has not changed but the leader has changed.
-        onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
+    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
+        return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
+    }
 
-        if (getRoleChangeNotifier().isPresent() &&
-                (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
-            getRoleChangeNotifier().get().tell(
-                    new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
-                    getSelf());
-        }
+    @Override
+    public long snapshotSequenceNr() {
+        // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
+        // so that we can delete the persistent journal based on the saved sequence-number
+        // However , when akka replays the journal during recovery, it replays it from the sequence number when the
+        // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the
+        // last-sequence number known to us.
+        return context.getSnapshotManager().getLastSequenceNumber();
     }
 
     /**
-     * When a derived RaftActor needs to persist something it must call
-     * persistData.
+     * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+     * {@link #applyState(ActorRef, Identifier, Object)} is notified.
      *
-     * @param clientActor
-     * @param identifier
-     * @param data
+     * @param clientActor optional ActorRef that is provided via the applyState callback
+     * @param identifier the payload identifier
+     * @param data the payload data to persist
+     * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+     *        subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
      */
-    protected void persistData(final ActorRef clientActor, final String identifier,
-        final Payload data) {
-
-        ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+            final boolean batchHint) {
+        ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
+        replicatedLogEntry.setPersistencePending(true);
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
-        }
+        LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog
-                .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
-                    @Override
-                    public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
-                        if(!hasFollowers()){
-                            // Increment the Commit Index and the Last Applied values
-                            raftContext.setCommitIndex(replicatedLogEntry.getIndex());
-                            raftContext.setLastApplied(replicatedLogEntry.getIndex());
-
-                            // Apply the state immediately
-                            applyState(clientActor, identifier, data);
-
-                            // Send a ApplyJournalEntries message so that we write the fact that we applied
-                            // the state to durable storage
-                            self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
-
-                            // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!context.isSnapshotCaptureInitiated()){
-                                raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
-                                        raftContext.getTermInformation().getCurrentTerm());
-                                raftContext.getReplicatedLog().snapshotCommit();
-                            } else {
-                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
-                                        persistenceId(), getId());
-                            }
-                        } else if (clientActor != null) {
-                            // Send message for replication
-                            currentBehavior.handleMessage(getSelf(),
-                                    new Replicate(clientActor, identifier,
-                                            replicatedLogEntry)
-                            );
-                        }
+        boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+            // Clear the persistence pending flag in the log entry.
+            persistedLogEntry.setPersistencePending(false);
 
-                    }
-                });    }
+            if (!hasFollowers()) {
+                // Increment the Commit Index and the Last Applied values
+                raftContext.setCommitIndex(persistedLogEntry.getIndex());
+                raftContext.setLastApplied(persistedLogEntry.getIndex());
+
+                // Apply the state immediately.
+                handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
+
+                // Send a ApplyJournalEntries message so that we write the fact that we applied
+                // the state to durable storage
+                self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
+
+            } else {
+                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
+
+                // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+                // normally should still be the leader) to check if consensus has now been reached in conjunction with
+                // follower replication.
+                getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
+            }
+        }, true);
+
+        if (wasAppended && hasFollowers()) {
+            // Send log entry for replication.
+            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+                    !batchHint));
+        }
+    }
+
+    private ReplicatedLog replicatedLog() {
+        return context.getReplicatedLog();
+    }
 
     protected String getId() {
         return context.getId();
     }
 
+    @VisibleForTesting
+    void setCurrentBehavior(RaftActorBehavior behavior) {
+        context.setCurrentBehavior(behavior);
+    }
+
+    protected RaftActorBehavior getCurrentBehavior() {
+        return context.getCurrentBehavior();
+    }
+
     /**
      * Derived actors can call the isLeader method to check if the current
-     * RaftActor is the Leader or not
+     * RaftActor is the Leader or not.
      *
      * @return true it this RaftActor is a Leader false otherwise
      */
     protected boolean isLeader() {
-        return context.getId().equals(currentBehavior.getLeaderId());
+        return context.getId().equals(getCurrentBehavior().getLeaderId());
+    }
+
+    protected final boolean isLeaderActive() {
+        return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader
+                && !shuttingDown && !isLeadershipTransferInProgress();
+    }
+
+    protected boolean isLeadershipTransferInProgress() {
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
+        return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
     /**
@@ -490,10 +672,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @return A reference to the leader if known, null otherwise
      */
-    protected ActorSelection getLeader(){
+    public ActorSelection getLeader() {
         String leaderAddress = getLeaderAddress();
 
-        if(leaderAddress == null){
+        if (leaderAddress == null) {
             return null;
         }
 
@@ -501,69 +683,116 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
+     * Returns the id of the current leader.
      *
      * @return the current leader's id
      */
-    protected String getLeaderId(){
-        return currentBehavior.getLeaderId();
+    protected final String getLeaderId() {
+        return getCurrentBehavior().getLeaderId();
     }
 
-    protected RaftState getRaftState() {
-        return currentBehavior.state();
+    @VisibleForTesting
+    protected final RaftState getRaftState() {
+        return getCurrentBehavior().state();
     }
 
-    protected ReplicatedLogEntry getLastLogEntry() {
-        return replicatedLog.last();
+    protected Long getCurrentTerm() {
+        return context.getTermInformation().getCurrentTerm();
     }
 
-    protected Long getCurrentTerm(){
-        return context.getTermInformation().getCurrentTerm();
+    protected RaftActorContext getRaftActorContext() {
+        return context;
     }
 
-    protected Long getCommitIndex(){
-        return context.getCommitIndex();
+    protected void updateConfigParams(ConfigParams configParams) {
+
+        // obtain the RaftPolicy for oldConfigParams and the updated one.
+        String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
+        String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass();
+
+        LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
+            oldRaftPolicy, newRaftPolicy);
+        context.setConfigParams(configParams);
+        if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) {
+            // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
+            // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
+            // avoids potential disruption. Otherwise, switch to Follower normally.
+            RaftActorBehavior behavior = getCurrentBehavior();
+            if (behavior != null && behavior.state() == RaftState.Follower) {
+                String previousLeaderId = behavior.getLeaderId();
+                short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
+
+                LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(),
+                        previousLeaderId);
+
+                changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
+            } else {
+                initializeBehavior();
+            }
+        }
     }
 
-    protected Long getLastApplied(){
-        return context.getLastApplied();
+    public final DataPersistenceProvider persistence() {
+        return delegatingPersistenceProvider.getDelegate();
     }
 
-    protected RaftActorContext getRaftActorContext() {
-        return context;
+    public void setPersistence(DataPersistenceProvider provider) {
+        delegatingPersistenceProvider.setDelegate(provider);
     }
 
-    protected void updateConfigParams(ConfigParams configParams) {
-        context.setConfigParams(configParams);
+    protected void setPersistence(boolean persistent) {
+        DataPersistenceProvider currentPersistence = persistence();
+        if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
+            setPersistence(new PersistentDataProvider(this));
+
+            if (getCurrentBehavior() != null) {
+                LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+                captureSnapshot();
+            }
+        } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
+            setPersistence(new NonPersistentDataProvider() {
+                /**
+                 * The way snapshotting works is,
+                 * <ol>
+                 * <li> RaftActor calls createSnapshot on the Shard
+                 * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+                 * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
+                 * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
+                 * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
+                 * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
+                 * </ol>
+                 */
+                @Override
+                public void saveSnapshot(Object object) {
+                    // Make saving Snapshot successful
+                    // Committing the snapshot here would end up calling commit in the creating state which would
+                    // be a state violation. That's why now we send a message to commit the snapshot.
+                    self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
+                }
+            });
+        }
     }
 
     /**
      * setPeerAddress sets the address of a known peer at a later time.
+     *
      * <p>
      * This is to account for situations where a we know that a peer
      * exists but we do not know an address up-front. This may also be used in
      * situations where a known peer starts off in a different location and we
      * need to change it's address
+     *
      * <p>
      * Note that if the peerId does not match the list of peers passed to
      * this actor during construction an IllegalStateException will be thrown.
-     *
-     * @param peerId
-     * @param peerAddress
      */
-    protected void setPeerAddress(String peerId, String peerAddress){
+    protected void setPeerAddress(String peerId, String peerAddress) {
         context.setPeerAddress(peerId, peerAddress);
     }
 
-    protected void commitSnapshot(long sequenceNumber) {
-        context.getReplicatedLog().snapshotCommit();
-
-        // TODO: Not sure if we want to be this aggressive with trimming stuff
-        trimPersistentData(sequenceNumber);
-    }
-
     /**
      * The applyState method will be called by the RaftActor when some data
-     * needs to be applied to the actor's state
+     * needs to be applied to the actor's state.
      *
      * @param clientActor A reference to the client who sent this message. This
      *                    is the same reference that was passed to persistData
@@ -578,35 +807,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * @param data        A piece of data that was persisted by the persistData call.
      *                    This should NEVER be null.
      */
-    protected abstract void applyState(ActorRef clientActor, String identifier,
-        Object data);
+    protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
 
     /**
-     * This method is called during recovery at the start of a batch of state entries. Derived
-     * classes should perform any initialization needed to start a batch.
+     * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
      */
-    protected abstract void startLogRecoveryBatch(int maxBatchSize);
-
-    /**
-     * This method is called during recovery to append state data to the current batch. This method
-     * is called 1 or more times after {@link #startLogRecoveryBatch}.
-     *
-     * @param data the state data
-     */
-    protected abstract void appendRecoveredLogEntry(Payload data);
-
-    /**
-     * This method is called during recovery to reconstruct the state of the actor.
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
-
-    /**
-     * This method is called during recovery at the end of a batch to apply the current batched
-     * log entries. This method is called after {@link #appendRecoveredLogEntry}.
-     */
-    protected abstract void applyCurrentLogRecoveryBatch();
+    @Nonnull
+    protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
 
     /**
      * This method is called when recovery is complete.
@@ -614,24 +821,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * This method will be called by the RaftActor when a snapshot needs to be
-     * created. The derived actor should respond with its current state.
-     * <p/>
-     * During recovery the state that is returned by the derived actor will
-     * be passed back to it by calling the applySnapshot  method
-     *
-     * @return The current state of the actor
+     * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
      */
-    protected abstract void createSnapshot();
-
-    /**
-     * This method can be called at any other point during normal
-     * operations when the derived actor is out of sync with it's peers
-     * and the only way to bring it in sync is by applying a snapshot
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applySnapshot(byte[] snapshotBytes);
+    @Nonnull
+    protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -640,358 +833,203 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     protected abstract void onStateChanged();
 
-    protected abstract DataPersistenceProvider persistence();
-
     /**
-     * Notifier Actor for this RaftActor to notify when a role change happens
+     * Notifier Actor for this RaftActor to notify when a role change happens.
+     *
      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
      */
     protected abstract Optional<ActorRef> getRoleChangeNotifier();
 
-    protected void onLeaderChanged(String oldLeader, String newLeader){};
+    /**
+     * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
+     * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
+     * work prior to performing the operation. On completion of any work, the run method must be called on the
+     * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
+     * this actor's thread dispatcher as as it modifies internal state.
+     *
+     * <p>
+     * The default implementation immediately runs the operation.
+     *
+     * @param operation the operation to run
+     */
+    protected void pauseLeader(Runnable operation) {
+        operation.run();
+    }
 
-    private void trimPersistentData(long sequenceNumber) {
-        // Trim akka snapshots
-        // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
-        // For now guessing that it is ANDed.
-        persistence().deleteSnapshots(new SnapshotSelectionCriteria(
-            sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+    /**
+     * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
+     * should resume normal operations.
+     *
+     * <p>
+     * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
+     */
+    protected void unpauseLeader() {
+
+    }
 
-        // Trim akka journal
-        persistence().deleteMessages(sequenceNumber);
+    protected void onLeaderChanged(String oldLeader, String newLeader) {
     }
 
-    private String getLeaderAddress(){
-        if(isLeader()){
+    private String getLeaderAddress() {
+        if (isLeader()) {
             return getSelf().path().toString();
         }
-        String leaderId = currentBehavior.getLeaderId();
+        String leaderId = getLeaderId();
         if (leaderId == null) {
             return null;
         }
         String peerAddress = context.getPeerAddress(leaderId);
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
-                    persistenceId(), leaderId, peerAddress);
-        }
+        LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
 
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
-        LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
-        // create a snapshot object from the state provided and save it
-        // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
-        Snapshot sn = Snapshot.create(snapshotBytes,
-            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
-            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
-            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
-        persistence().saveSnapshot(sn);
-
-        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
-        long dataThreshold = Runtime.getRuntime().totalMemory() *
-                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-        if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
-                        persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
-                        captureSnapshot.getLastAppliedIndex());
-            }
-
-            // if memory is less, clear the log based on lastApplied.
-            // this could/should only happen if one of the followers is down
-            // as normally we keep removing from the log when its replicated to all.
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
-                    captureSnapshot.getLastAppliedTerm());
-
-            // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
-            // install snapshot to a follower.
-            if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
-                getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-            }
-        } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
-            // clear the log based on replicatedToAllIndex
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
-                    captureSnapshot.getReplicatedToAllTerm());
-
-            getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-        } else {
-            // The replicatedToAllIndex was not found in the log
-            // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
-            // In this scenario we may need to save the snapshot to the akka persistence
-            // snapshot for recovery but we do not need to do the replicated log trimming.
-            context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
-                    replicatedLog.getSnapshotTerm());
-        }
-
-
-        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
-            "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
-            replicatedLog.getSnapshotTerm());
-
-        if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
-            // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
-                    ByteString.copyFrom(snapshotBytes)));
-        }
-
-        captureSnapshot = null;
-        context.setSnapshotCaptureInitiated(false);
-    }
-
-    protected boolean hasFollowers(){
-        return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
+    protected boolean hasFollowers() {
+        return getRaftActorContext().hasFollowers();
     }
 
-    private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
+    private void captureSnapshot() {
+        SnapshotManager snapshotManager = context.getSnapshotManager();
 
-        private static final int DATA_SIZE_DIVIDER = 5;
-        private long dataSizeSinceLastSnapshot = 0;
+        if (!snapshotManager.isCapturing()) {
+            final long idx = getCurrentBehavior().getReplicatedToAllIndex();
+            LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
+                replicatedLog().last(), idx);
 
-        public ReplicatedLogImpl(Snapshot snapshot) {
-            super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
-                snapshot.getUnAppliedEntries());
+            snapshotManager.capture(replicatedLog().last(), idx);
         }
+    }
 
-        public ReplicatedLogImpl() {
-            super();
-        }
-
-        @Override public void removeFromAndPersist(long logEntryIndex) {
-            int adjustedIndex = adjustedIndex(logEntryIndex);
-
-            if (adjustedIndex < 0) {
-                return;
-            }
-
-            // FIXME: Maybe this should be done after the command is saved
-            journal.subList(adjustedIndex , journal.size()).clear();
-
-            persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
-
+    /**
+     * Switch this member to non-voting status. This is a no-op for all behaviors except when we are the leader,
+     * in which case we need to step down.
+     */
+    void becomeNonVoting() {
+        if (isLeader()) {
+            initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
-                public void apply(DeleteEntries param)
-                        throws Exception {
-                    //FIXME : Doing nothing for now
-                    dataSize = 0;
-                    for (ReplicatedLogEntry entry : journal) {
-                        dataSize += entry.size();
-                    }
+                public void onSuccess(ActorRef raftActorRef) {
+                    LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
+                    ensureFollowerState();
                 }
-            });
-        }
-
-        @Override public void appendAndPersist(
-            final ReplicatedLogEntry replicatedLogEntry) {
-            appendAndPersist(replicatedLogEntry, null);
-        }
-
-        public void appendAndPersist(
-            final ReplicatedLogEntry replicatedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback)  {
 
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
-            }
+                @Override
+                public void onFailure(ActorRef raftActorRef) {
+                    LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
+                    ensureFollowerState();
+                }
 
-            // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-            journal.add(replicatedLogEntry);
-
-            // When persisting events with persist it is guaranteed that the
-            // persistent actor will not receive further commands between the
-            // persist call and the execution(s) of the associated event
-            // handler. This also holds for multiple persist calls in context
-            // of a single command.
-            persistence().persist(replicatedLogEntry,
-                new Procedure<ReplicatedLogEntry>() {
-                    @Override
-                    public void apply(ReplicatedLogEntry evt) throws Exception {
-                        int logEntrySize = replicatedLogEntry.size();
-
-                        dataSize += logEntrySize;
-                        long dataSizeForCheck = dataSize;
-
-                        dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex() + 1;
-
-                        if(!hasFollowers()) {
-                            // When we do not have followers we do not maintain an in-memory log
-                            // due to this the journalSize will never become anything close to the
-                            // snapshot batch count. In fact will mostly be 1.
-                            // Similarly since the journal's dataSize depends on the entries in the
-                            // journal the journal's dataSize will never reach a value close to the
-                            // memory threshold.
-                            // By maintaining the dataSize outside the journal we are tracking essentially
-                            // what we have written to the disk however since we no longer are in
-                            // need of doing a snapshot just for the sake of freeing up memory we adjust
-                            // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
-                            // as if we were maintaining a real snapshot
-                            dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
-                        }
-
-                        long dataThreshold = Runtime.getRuntime().totalMemory() *
-                                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                        // when a snaphsot is being taken, captureSnapshot != null
-                        if (!context.isSnapshotCaptureInitiated() &&
-                                ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
-                                        dataSizeForCheck > dataThreshold)) {
-
-                            dataSizeSinceLastSnapshot = 0;
-
-                            LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
-                                " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
-
-                            long lastAppliedIndex = -1;
-                            long lastAppliedTerm = -1;
-
-                            ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
-                            if (!hasFollowers()) {
-                                lastAppliedIndex = replicatedLogEntry.getIndex();
-                                lastAppliedTerm = replicatedLogEntry.getTerm();
-                            } else if (lastAppliedEntry != null) {
-                                lastAppliedIndex = lastAppliedEntry.getIndex();
-                                lastAppliedTerm = lastAppliedEntry.getTerm();
-                            }
-
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
-                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
-                                        persistenceId(), context.getLastApplied());
-                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
-                                        lastAppliedIndex);
-                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
-                                        lastAppliedTerm);
-                            }
-
-                            // send a CaptureSnapshot to self to make the expensive operation async.
-                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
-                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
-                                null);
-                            context.setSnapshotCaptureInitiated(true);
-                        }
-                        if (callback != null){
-                            callback.apply(replicatedLogEntry);
-                        }
+                private void ensureFollowerState() {
+                    // Whether or not leadership transfer succeeded, we have to step down as leader and
+                    // switch to Follower so ensure that.
+                    if (getRaftState() != RaftState.Follower) {
+                        initializeBehavior();
                     }
                 }
-            );
+            }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
         }
-
     }
 
-    static class DeleteEntries implements Serializable {
-        private static final long serialVersionUID = 1L;
-        private final int fromIndex;
+    /**
+     * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
+     */
+    private abstract static class BehaviorState implements Immutable {
+        @Nullable abstract RaftActorBehavior getBehavior();
 
-        public DeleteEntries(int fromIndex) {
-            this.fromIndex = fromIndex;
-        }
+        @Nullable abstract String getLastValidLeaderId();
 
-        public int getFromIndex() {
-            return fromIndex;
-        }
-    }
+        @Nullable abstract String getLastLeaderId();
 
+        @Nullable abstract short getLeaderPayloadVersion();
+    }
 
-    private class ElectionTermImpl implements ElectionTerm {
-        /**
-         * Identifier of the actor whose election term information this is
-         */
-        private long currentTerm = 0;
-        private String votedFor = null;
+    /**
+     * A {@link BehaviorState} corresponding to non-null {@link RaftActorBehavior} state.
+     */
+    private static final class SimpleBehaviorState extends BehaviorState {
+        private final RaftActorBehavior behavior;
+        private final String lastValidLeaderId;
+        private final String lastLeaderId;
+        private final short leaderPayloadVersion;
+
+        SimpleBehaviorState(final String lastValidLeaderId, final String lastLeaderId,
+                final RaftActorBehavior behavior) {
+            this.lastValidLeaderId = lastValidLeaderId;
+            this.lastLeaderId = lastLeaderId;
+            this.behavior = Preconditions.checkNotNull(behavior);
+            this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
+        }
 
         @Override
-        public long getCurrentTerm() {
-            return currentTerm;
+        RaftActorBehavior getBehavior() {
+            return behavior;
         }
 
         @Override
-        public String getVotedFor() {
-            return votedFor;
+        String getLastValidLeaderId() {
+            return lastValidLeaderId;
         }
 
-        @Override public void update(long currentTerm, String votedFor) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
-            }
-            this.currentTerm = currentTerm;
-            this.votedFor = votedFor;
+        @Override
+        short getLeaderPayloadVersion() {
+            return leaderPayloadVersion;
         }
 
         @Override
-        public void updateAndPersist(long currentTerm, String votedFor){
-            update(currentTerm, votedFor);
-            // FIXME : Maybe first persist then update the state
-            persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
-
-                @Override public void apply(UpdateElectionTerm param)
-                    throws Exception {
-
-                }
-            });
+        String getLastLeaderId() {
+            return lastLeaderId;
         }
     }
 
-    static class UpdateElectionTerm implements Serializable {
-        private static final long serialVersionUID = 1L;
-        private final long currentTerm;
-        private final String votedFor;
+    /**
+     * Class tracking behavior-related information, which we need to keep around and pass across behavior switches.
+     * An instance is created for each RaftActor. It has two functions:
+     * - it keeps track of the last leader ID we have encountered since we have been created
+     * - it creates state capture needed to transition from one behavior to the next
+     */
+    private static final class BehaviorStateTracker {
+        /**
+         * A {@link BehaviorState} corresponding to null {@link RaftActorBehavior} state. Since null behavior is only
+         * allowed before we receive the first message, we know the leader ID to be null.
+         */
+        private static final BehaviorState NULL_BEHAVIOR_STATE = new BehaviorState() {
+            @Override
+            RaftActorBehavior getBehavior() {
+                return null;
+            }
 
-        public UpdateElectionTerm(long currentTerm, String votedFor) {
-            this.currentTerm = currentTerm;
-            this.votedFor = votedFor;
-        }
+            @Override
+            String getLastValidLeaderId() {
+                return null;
+            }
 
-        public long getCurrentTerm() {
-            return currentTerm;
-        }
+            @Override
+            short getLeaderPayloadVersion() {
+                return -1;
+            }
 
-        public String getVotedFor() {
-            return votedFor;
-        }
-    }
+            @Override
+            String getLastLeaderId() {
+                return null;
+            }
+        };
 
-    protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
+        private String lastValidLeaderId;
+        private String lastLeaderId;
 
-        public NonPersistentRaftDataProvider(){
+        BehaviorState capture(final RaftActorBehavior behavior) {
+            if (behavior == null) {
+                Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
+                return NULL_BEHAVIOR_STATE;
+            }
 
-        }
+            lastLeaderId = behavior.getLeaderId();
+            if (lastLeaderId != null) {
+                lastValidLeaderId = lastLeaderId;
+            }
 
-        /**
-         * The way snapshotting works is,
-         * <ol>
-         * <li> RaftActor calls createSnapshot on the Shard
-         * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
-         * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
-         * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
-         * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
-         * in SaveSnapshotSuccess.
-         * </ol>
-         * @param o
-         */
-        @Override
-        public void saveSnapshot(Object o) {
-            // Make saving Snapshot successful
-            commitSnapshot(-1L);
+            return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
         }
     }
 
-    @VisibleForTesting
-    void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
-        currentBehavior = behavior;
-    }
-
-    protected RaftActorBehavior getCurrentBehavior() {
-        return currentBehavior;
-    }
-
 }