Add MXBean to report shard registered DTCL/DCL info
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index b207e0b72547158b092156f1cf324b13237e7037..9441f28fbc2f51a64716a02295a87087a801f68b 100644 (file)
@@ -12,12 +12,12 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.actor.Status;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.collect.Lists;
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -34,8 +34,8 @@ import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
@@ -50,18 +50,22 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.concepts.Immutable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * RaftActor encapsulates a state machine that needs to be kept synchronized
  * in a cluster. It implements the RAFT algorithm as described in the paper
  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
  * In Search of an Understandable Consensus Algorithm</a>
- * <p/>
+ *
+ * <p>
  * RaftActor has 3 states and each state has a certain behavior associated
  * with it. A Raft actor can behave as,
  * <ul>
@@ -69,27 +73,26 @@ import org.slf4j.LoggerFactory;
  * <li> A Follower (or) </li>
  * <li> A Candidate </li>
  * </ul>
- * <p/>
- * <p/>
+ *
+ * <p>
  * A RaftActor MUST be a Leader in order to accept requests from clients to
  * change the state of it's encapsulated state machine. Once a RaftActor becomes
  * a Leader it is also responsible for ensuring that all followers ultimately
  * have the same log and therefore the same state machine as itself.
- * <p/>
- * <p/>
+ *
+ * <p>
  * The current behavior of a RaftActor determines how election for leadership
  * is initiated and how peer RaftActors react to request for votes.
- * <p/>
- * <p/>
+ *
+ * <p>
  * Each RaftActor also needs to know the current election term. It uses this
  * information for a couple of things. One is to simply figure out who it
  * voted for in the last election. Another is to figure out if the message
  * it received to update it's state is stale.
- * <p/>
- * <p/>
+ *
+ * <p>
  * The RaftActor uses akka-persistence to store it's replicated log.
  * Furthermore through it's behaviors a Raft Actor determines
- * <p/>
  * <ul>
  * <li> when a log entry should be persisted </li>
  * <li> when a log entry should be applied to the state machine (and) </li>
@@ -100,11 +103,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    protected final Logger LOG = LoggerFactory.getLogger(getClass());
-
     /**
      * This context should NOT be passed directly to any other actor it is
-     * only to be consumed by the RaftActorBehaviors
+     * only to be consumed by the RaftActorBehaviors.
      */
     private final RaftActorContextImpl context;
 
@@ -120,11 +121,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private RaftActorServerConfigurationSupport serverConfigurationSupport;
 
-    private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
     private boolean shuttingDown;
 
-    public RaftActor(String id, Map<String, String> peerAddresses,
+    protected RaftActor(String id, Map<String, String> peerAddresses,
          Optional<ConfigParams> configParams, short payloadVersion) {
 
         persistentProvider = new PersistentDataProvider(this);
@@ -133,8 +132,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context = new RaftActorContextImpl(this.getSelf(),
             this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG),
             -1, -1, peerAddresses,
-            (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
-            delegatingPersistenceProvider, LOG);
+            configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(),
+            delegatingPersistenceProvider, this::handleApplyState, LOG);
 
         context.setPayloadVersion(payloadVersion);
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
@@ -159,24 +158,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     protected void handleRecover(Object message) {
-        if(raftRecovery == null) {
+        if (raftRecovery == null) {
             raftRecovery = newRaftActorRecoverySupport();
         }
 
         boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider);
-        if(recoveryComplete) {
+        if (recoveryComplete) {
             onRecoveryComplete();
 
             initializeBehavior();
 
             raftRecovery = null;
-
-            if (context.getReplicatedLog().size() > 0) {
-                self().tell(new InitiateCaptureSnapshot(), self());
-                LOG.info("{}: Snapshot capture initiated after recovery", persistenceId());
-            } else {
-                LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId());
-            }
         }
     }
 
@@ -185,11 +177,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     @VisibleForTesting
-    void initializeBehavior(){
+    void initializeBehavior() {
         changeCurrentBehavior(new Follower(context));
     }
 
     @VisibleForTesting
+    @SuppressWarnings("checkstyle:IllegalCatch")
     protected void changeCurrentBehavior(RaftActorBehavior newBehavior) {
         final RaftActorBehavior currentBehavior = getCurrentBehavior();
         if (currentBehavior != null) {
@@ -217,6 +210,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
+     * Handles a message.
+     *
      * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override
      * {@link #handleNonRaftCommand(Object)} instead.
      */
@@ -230,27 +225,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         if (snapshotSupport.handleSnapshotMessage(message, getSender())) {
             return;
         }
-
         if (message instanceof ApplyState) {
             ApplyState applyState = (ApplyState) message;
 
-            long startTime = System.nanoTime();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Applying state for log index {} data {}",
-                    persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
-                    applyState.getReplicatedLogEntry().getData());
-            }
-
-            applyState(applyState.getClientActor(), applyState.getIdentifier(),
-                applyState.getReplicatedLogEntry().getData());
-
-            long elapsedTime = System.nanoTime() - startTime;
-            if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
-                LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
-                        TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
-            }
-
             if (!hasFollowers()) {
                 // for single node, the capture should happen after the apply state
                 // as we delete messages from the persistent journal which have made it to the snapshot
@@ -261,67 +238,119 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 context.getSnapshotManager().trimLog(context.getLastApplied());
             }
 
+            possiblyHandleBehaviorMessage(message);
         } else if (message instanceof ApplyJournalEntries) {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
-            }
+            LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
-            persistence().persist(applyEntries, NoopProcedure.instance());
+            persistence().persistAsync(applyEntries, NoopProcedure.instance());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(getLeaderAddress()),
                 getSelf()
             );
-        } else if(message instanceof GetOnDemandRaftState) {
+        } else if (message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
-        } else if(message instanceof InitiateCaptureSnapshot) {
+        } else if (message instanceof InitiateCaptureSnapshot) {
             captureSnapshot();
-        } else if(message instanceof SwitchBehavior) {
-            switchBehavior(((SwitchBehavior) message));
-        } else if(message instanceof LeaderTransitioning) {
-            onLeaderTransitioning();
-        } else if(message instanceof Shutdown) {
+        } else if (message instanceof SwitchBehavior) {
+            switchBehavior((SwitchBehavior) message);
+        } else if (message instanceof LeaderTransitioning) {
+            onLeaderTransitioning((LeaderTransitioning)message);
+        } else if (message instanceof Shutdown) {
             onShutDown();
-        } else if(message instanceof Runnable) {
+        } else if (message instanceof Runnable) {
             ((Runnable)message).run();
-        } else {
-            // Processing the message may affect the state, hence we need to capture it
-            final RaftActorBehavior currentBehavior = getCurrentBehavior();
-            final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
-
-            // A behavior indicates that it processed the change by returning a reference to the next behavior
-            // to be used. A null return indicates it has not processed the message and we should be passing it to
-            // the subclass for handling.
-            final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
-            if (nextBehavior != null) {
-                switchBehavior(state, nextBehavior);
-            } else {
-                handleNonRaftCommand(message);
+        } else if (message instanceof NoopPayload) {
+            persistData(null, null, (NoopPayload) message, false);
+        } else if (message instanceof RequestLeadership) {
+            onRequestLeadership((RequestLeadership) message);
+        } else if (!possiblyHandleBehaviorMessage(message)) {
+            handleNonRaftCommand(message);
+        }
+    }
+
+    private void onRequestLeadership(final RequestLeadership message) {
+        LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+        if (!isLeader()) {
+            // non-leader cannot satisfy leadership request
+            LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+                    + " Current behavior: {}. Sending failure response",
+                    persistenceId(), getCurrentBehavior().state());
+            message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+                    + message.getRequestedFollowerId()
+                    + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+            return;
+        }
+
+        final String requestedFollowerId = message.getRequestedFollowerId();
+        final ActorRef replyTo = message.getReplyTo();
+        initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+            @Override
+            public void onSuccess(final ActorRef raftActorRef) {
+                // sanity check
+                if (!requestedFollowerId.equals(getLeaderId())) {
+                    onFailure(raftActorRef);
+                }
+
+                LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Success(null), getSelf());
             }
+
+            @Override
+            public void onFailure(final ActorRef raftActorRef) {
+                LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Failure(
+                        new LeadershipTransferFailedException(
+                                "Failed to transfer leadership to " + requestedFollowerId
+                                        + ". Follower is not ready to become leader")),
+                        getSelf());
+            }
+        }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
+    }
+
+    private boolean possiblyHandleBehaviorMessage(final Object message) {
+        final RaftActorBehavior currentBehavior = getCurrentBehavior();
+        final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+
+        // A behavior indicates that it processed the change by returning a reference to the next behavior
+        // to be used. A null return indicates it has not processed the message and we should be passing it to
+        // the subclass for handling.
+        final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
+        if (nextBehavior != null) {
+            switchBehavior(state, nextBehavior);
+            return true;
         }
+
+        return false;
     }
 
-    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+            @Nullable final String followerId, long newLeaderTimeoutInMillis) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
-        if(leadershipTransferInProgress == null) {
-            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
+        if (leadershipTransferInProgress == null) {
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+            leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
                 public void onSuccess(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
 
                 @Override
                 public void onFailure(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
             });
 
             leadershipTransferInProgress.addOnComplete(onComplete);
+
+            context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
             leadershipTransferInProgress.init();
+
         } else {
             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
             leadershipTransferInProgress.addOnComplete(onComplete);
@@ -331,17 +360,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void onShutDown() {
         LOG.debug("{}: onShutDown", persistenceId());
 
-        if(shuttingDown) {
+        if (shuttingDown) {
             return;
         }
 
         shuttingDown = true;
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        if (currentBehavior.state() != RaftState.Leader) {
-            // For non-leaders shutdown is a no-op
-            self().tell(PoisonPill.getInstance(), self());
-            return;
+        switch (currentBehavior.state()) {
+            case Leader:
+            case PreLeader:
+                // Fall-through to more work
+                break;
+            default:
+                // For non-leaders shutdown is a no-op
+                self().tell(PoisonPill.getInstance(), self());
+                return;
         }
 
         if (context.hasFollowers()) {
@@ -357,7 +391,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
-            });
+            }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
         } else {
             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
                 @Override
@@ -373,19 +407,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private void onLeaderTransitioning() {
-        LOG.debug("{}: onLeaderTransitioning", persistenceId());
+    private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+        LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+        if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+                && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
             roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
                 getCurrentBehavior().getLeaderPayloadVersion()), getSelf());
         }
     }
 
     private void switchBehavior(SwitchBehavior message) {
-        if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
+        if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) {
             RaftState newState = message.getNewState();
-            ifnewState == RaftState.Leader || newState == RaftState.Follower) {
+            if (newState == RaftState.Leader || newState == RaftState.Follower) {
                 switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
                     AbstractRaftActorBehavior.createBehavior(context, message.getNewState()));
                 getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), "");
@@ -410,13 +445,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
         Map<String, String> peerAddresses = new HashMap<>();
         Map<String, Boolean> peerVotingStates = new HashMap<>();
-        for(PeerInfo info: context.getPeers()) {
-            peerVotingStates.put(info.getId(), info.getVotingState() != VotingState.NON_VOTING);
-            peerAddresses.put(info.getId(), info.getAddress());
+        for (PeerInfo info: context.getPeers()) {
+            peerVotingStates.put(info.getId(), info.isVoting());
+            peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : "");
         }
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+        OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
                 .commitIndex(context.getCommitIndex())
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog().dataSize())
@@ -442,14 +477,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             builder.lastLogTerm(lastLogEntry.getTerm());
         }
 
-        if(getCurrentBehavior() instanceof AbstractLeader) {
+        if (getCurrentBehavior() instanceof AbstractLeader) {
             AbstractLeader leader = (AbstractLeader)getCurrentBehavior();
             Collection<String> followerIds = leader.getFollowerIds();
             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
-            for(String id: followerIds) {
+            for (String id: followerIds) {
                 final FollowerLogInformation info = leader.getFollower(id);
                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
-                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
+                        info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()),
+                        context.getPeerInfo(info.getId()).isVoting()));
             }
 
             builder.followerInfoList(followerInfoList);
@@ -459,41 +495,71 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     }
 
+    protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
+        return OnDemandRaftState.builder();
+    }
+
     private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) {
         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
 
-        if (oldBehavior != currentBehavior){
+        if (oldBehavior != currentBehavior) {
             onStateChanged();
         }
 
+        String lastLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastLeaderId();
         String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId();
         String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
 
         // it can happen that the state has not changed but the leader has changed.
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
-        if(!Objects.equals(lastValidLeaderId, currentBehavior.getLeaderId()) ||
-           oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
-            if(roleChangeNotifier.isPresent()) {
+        if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId())
+                || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) {
+            if (roleChangeNotifier.isPresent()) {
                 roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(),
                         currentBehavior.getLeaderPayloadVersion()), getSelf());
             }
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
 
-            if(leadershipTransferInProgress != null) {
+            RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+                    context.getRaftActorLeadershipTransferCohort();
+            if (leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
 
             serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
         }
 
-        if (roleChangeNotifier.isPresent() &&
-                (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
+        if (roleChangeNotifier.isPresent()
+                && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) {
             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
                     currentBehavior.state().name()), getSelf());
         }
     }
 
+    private void handleApplyState(ApplyState applyState) {
+        long startTime = System.nanoTime();
+
+        Payload payload = applyState.getReplicatedLogEntry().getData();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: Applying state for log index {} data {}",
+                persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload);
+        }
+
+        if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
+            applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
+        }
+
+        long elapsedTime = System.nanoTime() - startTime;
+        if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
+            LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+                    TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+        }
+
+        // Send the ApplyState message back to self to handle further processing asynchronously.
+        self().tell(applyState, self());
+    }
+
     protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
         return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
     }
@@ -502,53 +568,64 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     public long snapshotSequenceNr() {
         // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
         // so that we can delete the persistent journal based on the saved sequence-number
-        // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot
-        // was saved and not the number we saved.
-        // We would want to override it , by asking akka to use the last-sequence number known to us.
+        // However , when akka replays the journal during recovery, it replays it from the sequence number when the
+        // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the
+        // last-sequence number known to us.
         return context.getSnapshotManager().getLastSequenceNumber();
     }
 
     /**
-     * When a derived RaftActor needs to persist something it must call
-     * persistData.
+     * Persists the given Payload in the journal and replicates to any followers. After successful completion,
+     * {@link #applyState(ActorRef, Identifier, Object)} is notified.
      *
-     * @param clientActor
-     * @param identifier
-     * @param data
+     * @param clientActor optional ActorRef that is provided via the applyState callback
+     * @param identifier the payload identifier
+     * @param data the payload data to persist
+     * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with
+     *        subsequent payloads for efficiency. Otherwise the payload is immediately replicated.
      */
-    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) {
-
-        ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
+    protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data,
+            final boolean batchHint) {
+        ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
+        replicatedLogEntry.setPersistencePending(true);
 
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
-        }
+        LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
-            if (!hasFollowers()){
+        boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+            // Clear the persistence pending flag in the log entry.
+            persistedLogEntry.setPersistencePending(false);
+
+            if (!hasFollowers()) {
                 // Increment the Commit Index and the Last Applied values
-                raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
-                raftContext.setLastApplied(replicatedLogEntry1.getIndex());
+                raftContext.setCommitIndex(persistedLogEntry.getIndex());
+                raftContext.setLastApplied(persistedLogEntry.getIndex());
 
                 // Apply the state immediately.
-                self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
+                handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
 
                 // Send a ApplyJournalEntries message so that we write the fact that we applied
                 // the state to durable storage
-                self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
+                self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
 
-            } else if (clientActor != null) {
-                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
+            } else {
+                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
 
-                // Send message for replication
-                getCurrentBehavior().handleMessage(getSelf(),
-                        new Replicate(clientActor, identifier, replicatedLogEntry1));
+                // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+                // normally should still be the leader) to check if consensus has now been reached in conjunction with
+                // follower replication.
+                getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
             }
-        });
+        }, true);
+
+        if (wasAppended && hasFollowers()) {
+            // Send log entry for replication.
+            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+                    !batchHint));
+        }
     }
 
     private ReplicatedLog replicatedLog() {
@@ -570,7 +647,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     /**
      * Derived actors can call the isLeader method to check if the current
-     * RaftActor is the Leader or not
+     * RaftActor is the Leader or not.
      *
      * @return true it this RaftActor is a Leader false otherwise
      */
@@ -579,11 +656,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected final boolean isLeaderActive() {
-        return getRaftState() != RaftState.IsolatedLeader && !shuttingDown &&
-                !isLeadershipTransferInProgress();
+        return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader
+                && !shuttingDown && !isLeadershipTransferInProgress();
     }
 
-    private boolean isLeadershipTransferInProgress() {
+    protected boolean isLeadershipTransferInProgress() {
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
@@ -594,10 +672,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      *
      * @return A reference to the leader if known, null otherwise
      */
-    protected ActorSelection getLeader(){
+    public ActorSelection getLeader() {
         String leaderAddress = getLeaderAddress();
 
-        if(leaderAddress == null){
+        if (leaderAddress == null) {
             return null;
         }
 
@@ -605,10 +683,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     /**
+     * Returns the id of the current leader.
      *
      * @return the current leader's id
      */
-    protected final String getLeaderId(){
+    protected final String getLeaderId() {
         return getCurrentBehavior().getLeaderId();
     }
 
@@ -617,7 +696,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return getCurrentBehavior().state();
     }
 
-    protected Long getCurrentTerm(){
+    protected Long getCurrentTerm() {
         return context.getTermInformation().getCurrentTerm();
     }
 
@@ -628,10 +707,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected void updateConfigParams(ConfigParams configParams) {
 
         // obtain the RaftPolicy for oldConfigParams and the updated one.
-        String oldRaftPolicy = context.getConfigParams().
-            getCustomRaftPolicyImplementationClass();
-        String newRaftPolicy = configParams.
-            getCustomRaftPolicyImplementationClass();
+        String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
+        String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass();
 
         LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
             oldRaftPolicy, newRaftPolicy);
@@ -645,7 +722,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 String previousLeaderId = behavior.getLeaderId();
                 short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
 
-                LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId);
+                LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(),
+                        previousLeaderId);
 
                 changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
             } else {
@@ -663,9 +741,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void setPersistence(boolean persistent) {
-        if(persistent) {
+        DataPersistenceProvider currentPersistence = persistence();
+        if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
             setPersistence(new PersistentDataProvider(this));
-        } else {
+
+            if (getCurrentBehavior() != null) {
+                LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+                captureSnapshot();
+            }
+        } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
             setPersistence(new NonPersistentDataProvider() {
                 /**
                  * The way snapshotting works is,
@@ -679,7 +763,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                  * </ol>
                  */
                 @Override
-                public void saveSnapshot(Object o) {
+                public void saveSnapshot(Object object) {
                     // Make saving Snapshot successful
                     // Committing the snapshot here would end up calling commit in the creating state which would
                     // be a state violation. That's why now we send a message to commit the snapshot.
@@ -691,25 +775,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     /**
      * setPeerAddress sets the address of a known peer at a later time.
+     *
      * <p>
      * This is to account for situations where a we know that a peer
      * exists but we do not know an address up-front. This may also be used in
      * situations where a known peer starts off in a different location and we
      * need to change it's address
+     *
      * <p>
      * Note that if the peerId does not match the list of peers passed to
      * this actor during construction an IllegalStateException will be thrown.
-     *
-     * @param peerId
-     * @param peerAddress
      */
-    protected void setPeerAddress(String peerId, String peerAddress){
+    protected void setPeerAddress(String peerId, String peerAddress) {
         context.setPeerAddress(peerId, peerAddress);
     }
 
     /**
      * The applyState method will be called by the RaftActor when some data
-     * needs to be applied to the actor's state
+     * needs to be applied to the actor's state.
      *
      * @param clientActor A reference to the client who sent this message. This
      *                    is the same reference that was passed to persistData
@@ -738,7 +821,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
+     * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
      */
     @Nonnull
     protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
@@ -751,7 +834,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onStateChanged();
 
     /**
-     * Notifier Actor for this RaftActor to notify when a role change happens
+     * Notifier Actor for this RaftActor to notify when a role change happens.
+     *
      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
      */
     protected abstract Optional<ActorRef> getRoleChangeNotifier();
@@ -762,6 +846,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      * work prior to performing the operation. On completion of any work, the run method must be called on the
      * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
      * this actor's thread dispatcher as as it modifies internal state.
+     *
      * <p>
      * The default implementation immediately runs the operation.
      *
@@ -771,12 +856,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         operation.run();
     }
 
-    protected void onLeaderChanged(String oldLeader, String newLeader) {
+    /**
+     * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
+     * should resume normal operations.
+     *
+     * <p>
+     * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
+     */
+    protected void unpauseLeader() {
 
-    };
+    }
+
+    protected void onLeaderChanged(String oldLeader, String newLeader) {
+    }
 
-    private String getLeaderAddress(){
-        if(isLeader()){
+    private String getLeaderAddress() {
+        if (isLeader()) {
             return getSelf().path().toString();
         }
         String leaderId = getLeaderId();
@@ -784,15 +879,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             return null;
         }
         String peerAddress = context.getPeerAddress(leaderId);
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
-                    persistenceId(), leaderId, peerAddress);
-        }
+        LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
 
         return peerAddress;
     }
 
-    protected boolean hasFollowers(){
+    protected boolean hasFollowers() {
         return getRaftActorContext().hasFollowers();
     }
 
@@ -834,61 +926,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         initializeBehavior();
                     }
                 }
-            });
-        }
-    }
-
-    /**
-     * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries}
-     *             whose type for fromIndex is long instead of int. This class was kept for backwards
-     *             compatibility with Helium.
-     */
-    // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
-    @SuppressWarnings("serial")
-    @Deprecated
-    static class DeleteEntries implements Serializable {
-        private final int fromIndex;
-
-        public DeleteEntries(int fromIndex) {
-            this.fromIndex = fromIndex;
-        }
-
-        public int getFromIndex() {
-            return fromIndex;
-        }
-    }
-
-    /**
-     * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm}
-     *             which has serialVersionUID set. This class was kept for backwards compatibility with Helium.
-     */
-    // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
-    @SuppressWarnings("serial")
-    @Deprecated
-    static class UpdateElectionTerm implements Serializable {
-        private final long currentTerm;
-        private final String votedFor;
-
-        public UpdateElectionTerm(long currentTerm, String votedFor) {
-            this.currentTerm = currentTerm;
-            this.votedFor = votedFor;
-        }
-
-        public long getCurrentTerm() {
-            return currentTerm;
-        }
-
-        public String getVotedFor() {
-            return votedFor;
+            }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
         }
     }
 
     /**
      * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
      */
-    private static abstract class BehaviorState implements Immutable {
+    private abstract static class BehaviorState implements Immutable {
         @Nullable abstract RaftActorBehavior getBehavior();
+
         @Nullable abstract String getLastValidLeaderId();
+
+        @Nullable abstract String getLastLeaderId();
+
         @Nullable abstract short getLeaderPayloadVersion();
     }
 
@@ -898,10 +949,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private static final class SimpleBehaviorState extends BehaviorState {
         private final RaftActorBehavior behavior;
         private final String lastValidLeaderId;
+        private final String lastLeaderId;
         private final short leaderPayloadVersion;
 
-        SimpleBehaviorState(final String lastValidLeaderId, final RaftActorBehavior behavior) {
+        SimpleBehaviorState(final String lastValidLeaderId, final String lastLeaderId,
+                final RaftActorBehavior behavior) {
             this.lastValidLeaderId = lastValidLeaderId;
+            this.lastLeaderId = lastLeaderId;
             this.behavior = Preconditions.checkNotNull(behavior);
             this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
         }
@@ -920,6 +974,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         short getLeaderPayloadVersion() {
             return leaderPayloadVersion;
         }
+
+        @Override
+        String getLastLeaderId() {
+            return lastLeaderId;
+        }
     }
 
     /**
@@ -948,9 +1007,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             short getLeaderPayloadVersion() {
                 return -1;
             }
+
+            @Override
+            String getLastLeaderId() {
+                return null;
+            }
         };
 
         private String lastValidLeaderId;
+        private String lastLeaderId;
 
         BehaviorState capture(final RaftActorBehavior behavior) {
             if (behavior == null) {
@@ -958,12 +1023,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 return NULL_BEHAVIOR_STATE;
             }
 
-            final String leaderId = behavior.getLeaderId();
-            if (leaderId != null) {
-                lastValidLeaderId = leaderId;
+            lastLeaderId = behavior.getLeaderId();
+            if (lastLeaderId != null) {
+                lastValidLeaderId = lastLeaderId;
             }
 
-            return new SimpleBehaviorState(lastValidLeaderId, behavior);
+            return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
         }
     }