X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=933fde98b9b432f60b545541d0b05cc03ef8da0d;hp=e9e0d630d092eeffa1833441936c26cfeea40d17;hb=61e85d54cfcd70053993f910092eba1ab3fcc850;hpb=766df8b58cc1decd7b40e0adbf41d3657fc21f2c diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index e9e0d630d0..933fde98b9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -11,20 +11,21 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.japi.Procedure; -import akka.persistence.SnapshotSelectionCriteria; +import akka.actor.PoisonPill; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Optional; -import com.google.common.base.Supplier; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.collect.Lists; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; @@ -36,10 +37,11 @@ import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; -import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -47,7 +49,10 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.Immutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,39 +102,42 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected final Logger LOG = LoggerFactory.getLogger(getClass()); - /** - * The current state determines the current behavior of a RaftActor - * A Raft Actor always starts off in the Follower State - */ - private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior(); - /** * This context should NOT be passed directly to any other actor it is * only to be consumed by the RaftActorBehaviors */ private final RaftActorContextImpl context; - private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null); + private final DelegatingPersistentDataProvider delegatingPersistenceProvider; + + private final PersistentDataProvider persistentProvider; + + private final BehaviorStateTracker behaviorStateTracker = new BehaviorStateTracker(); private RaftActorRecoverySupport raftRecovery; private RaftActorSnapshotMessageSupport snapshotSupport; - private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); + private RaftActorServerConfigurationSupport serverConfigurationSupport; - private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier(); + private RaftActorLeadershipTransferCohort leadershipTransferInProgress; + + private boolean shuttingDown; public RaftActor(String id, Map peerAddresses, Optional configParams, short payloadVersion) { + persistentProvider = new PersistentDataProvider(this); + delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider); + context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG), + this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), delegatingPersistenceProvider, LOG); context.setPayloadVersion(payloadVersion); - context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, currentBehavior)); + context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); } @Override @@ -140,70 +148,93 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { super.preStart(); snapshotSupport = newRaftActorSnapshotMessageSupport(); + serverConfigurationSupport = new RaftActorServerConfigurationSupport(this); } @Override public void postStop() { - if(currentBehavior.getDelegate() != null) { - try { - currentBehavior.close(); - } catch (Exception e) { - LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state()); - } - } - + context.close(); super.postStop(); } @Override - public void handleRecover(Object message) { + protected void handleRecover(Object message) { if(raftRecovery == null) { raftRecovery = newRaftActorRecoverySupport(); } - boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message); + boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider); if(recoveryComplete) { - if(!persistence().isRecoveryApplicable()) { - // Delete all the messages from the akka journal so that we do not end up with consistency issues - // Note I am not using the dataPersistenceProvider and directly using the akka api here - deleteMessages(lastSequenceNr()); - - // Delete all the akka snapshots as they will not be needed - deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); - } - onRecoveryComplete(); initializeBehavior(); raftRecovery = null; + + if (context.getReplicatedLog().size() > 0) { + self().tell(new InitiateCaptureSnapshot(), self()); + LOG.info("{}: Snapshot capture initiated after recovery", persistenceId()); + } else { + LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId()); + } } } protected RaftActorRecoverySupport newRaftActorRecoverySupport() { - return new RaftActorRecoverySupport(context, currentBehavior, getRaftActorRecoveryCohort()); + return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort()); } - protected void initializeBehavior(){ + @VisibleForTesting + void initializeBehavior(){ changeCurrentBehavior(new Follower(context)); } - protected void changeCurrentBehavior(RaftActorBehavior newBehavior){ - reusableBehaviorStateHolder.init(getCurrentBehavior()); + @VisibleForTesting + protected void changeCurrentBehavior(RaftActorBehavior newBehavior) { + final RaftActorBehavior currentBehavior = getCurrentBehavior(); + if (currentBehavior != null) { + try { + currentBehavior.close(); + } catch (Exception e) { + LOG.warn("{}: Error closing behavior {}", persistence(), currentBehavior, e); + } + } + + final BehaviorState state = behaviorStateTracker.capture(currentBehavior); setCurrentBehavior(newBehavior); - handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); + handleBehaviorChange(state, newBehavior); } + /** + * Method exposed for subclasses to plug-in their logic. This method is invoked by {@link #handleCommand(Object)} + * for messages which are not handled by this class. Subclasses overriding this class should fall back to this + * implementation for messages which they do not handle + * + * @param message Incoming command message + */ + protected void handleNonRaftCommand(final Object message) { + unhandled(message); + } + + /** + * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override + * {@link #handleNonRaftCommand(Object)} instead. + */ + @Deprecated @Override - public void handleCommand(final Object message) { - if (message instanceof ApplyState){ + // FIXME: make this method final once our unit tests do not need to override it + protected void handleCommand(final Object message) { + if (serverConfigurationSupport.handleMessage(message, getSender())) { + return; + } + if (snapshotSupport.handleSnapshotMessage(message, getSender())) { + return; + } + + if (message instanceof ApplyState) { ApplyState applyState = (ApplyState) message; - long elapsedTime = (System.nanoTime() - applyState.getStartTime()); - if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ - LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", - TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); - } + long startTime = System.nanoTime(); if(LOG.isDebugEnabled()) { LOG.debug("{}: Applying state for log index {} data {}", @@ -214,6 +245,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { applyState(applyState.getClientActor(), applyState.getIdentifier(), applyState.getReplicatedLogEntry().getData()); + long elapsedTime = System.nanoTime() - startTime; + if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ + LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", + TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); + } + if (!hasFollowers()) { // for single node, the capture should happen after the apply state // as we delete messages from the persistent journal which have made it to the snapshot @@ -221,13 +258,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // and recovery shows data missing context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry()); - context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior); + context.getSnapshotManager().trimLog(context.getLastApplied()); } - } else if (message instanceof ApplyJournalEntries){ + } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; if(LOG.isDebugEnabled()) { - LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex()); + LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); } persistence().persist(applyEntries, NoopProcedure.instance()); @@ -241,10 +278,107 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onGetOnDemandRaftStats(); } else if(message instanceof InitiateCaptureSnapshot) { captureSnapshot(); - } else if(message instanceof SwitchBehavior){ + } else if(message instanceof SwitchBehavior) { switchBehavior(((SwitchBehavior) message)); - } else if(!snapshotSupport.handleSnapshotMessage(message)) { - switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); + } else if(message instanceof LeaderTransitioning) { + onLeaderTransitioning(); + } else if(message instanceof Shutdown) { + onShutDown(); + } else if(message instanceof Runnable) { + ((Runnable)message).run(); + } else { + // Processing the message may affect the state, hence we need to capture it + final RaftActorBehavior currentBehavior = getCurrentBehavior(); + final BehaviorState state = behaviorStateTracker.capture(currentBehavior); + + // A behavior indicates that it processed the change by returning a reference to the next behavior + // to be used. A null return indicates it has not processed the message and we should be passing it to + // the subclass for handling. + final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message); + if (nextBehavior != null) { + switchBehavior(state, nextBehavior); + } else { + handleNonRaftCommand(message); + } + } + } + + private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { + LOG.debug("{}: Initiating leader transfer", persistenceId()); + + if(leadershipTransferInProgress == null) { + leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this); + leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { + @Override + public void onSuccess(ActorRef raftActorRef) { + leadershipTransferInProgress = null; + } + + @Override + public void onFailure(ActorRef raftActorRef) { + leadershipTransferInProgress = null; + } + }); + + leadershipTransferInProgress.addOnComplete(onComplete); + leadershipTransferInProgress.init(); + } else { + LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId()); + leadershipTransferInProgress.addOnComplete(onComplete); + } + } + + private void onShutDown() { + LOG.debug("{}: onShutDown", persistenceId()); + + if(shuttingDown) { + return; + } + + shuttingDown = true; + + final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); + if (currentBehavior.state() != RaftState.Leader) { + // For non-leaders shutdown is a no-op + self().tell(PoisonPill.getInstance(), self()); + return; + } + + if (context.hasFollowers()) { + initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { + @Override + public void onSuccess(ActorRef raftActorRef) { + LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId()); + raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); + } + + @Override + public void onFailure(ActorRef raftActorRef) { + LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId()); + raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); + } + }); + } else { + pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) { + @Override + protected void doRun() { + self().tell(PoisonPill.getInstance(), self()); + } + + @Override + protected void doCancel() { + self().tell(PoisonPill.getInstance(), self()); + } + }); + } + } + + private void onLeaderTransitioning() { + LOG.debug("{}: onLeaderTransitioning", persistenceId()); + Optional roleChangeNotifier = getRoleChangeNotifier(); + if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { + roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, + getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } } @@ -252,7 +386,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { RaftState newState = message.getNewState(); if( newState == RaftState.Leader || newState == RaftState.Follower) { - switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); + switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()), + AbstractRaftActorBehavior.createBehavior(context, message.getNewState())); getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); } else { LOG.warn("Switching to behavior : {} - not supported", newState); @@ -260,22 +395,27 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void switchBehavior(Supplier supplier){ - reusableBehaviorStateHolder.init(getCurrentBehavior()); - - setCurrentBehavior(supplier.get()); - - handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); + private void switchBehavior(final BehaviorState oldBehaviorState, final RaftActorBehavior nextBehavior) { + setCurrentBehavior(nextBehavior); + handleBehaviorChange(oldBehaviorState, nextBehavior); } - protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() { - return new RaftActorSnapshotMessageSupport(context, currentBehavior, - getRaftActorSnapshotCohort()); + @VisibleForTesting + RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() { + return new RaftActorSnapshotMessageSupport(context, getRaftActorSnapshotCohort()); } private void onGetOnDemandRaftStats() { // Debugging message to retrieve raft stats. + Map peerAddresses = new HashMap<>(); + Map peerVotingStates = new HashMap<>(); + for(PeerInfo info: context.getPeers()) { + peerVotingStates.put(info.getId(), info.isVoting()); + peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : ""); + } + + final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); OnDemandRaftState.Builder builder = OnDemandRaftState.builder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) @@ -291,9 +431,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .snapshotIndex(replicatedLog().getSnapshotIndex()) .snapshotTerm(replicatedLog().getSnapshotTerm()) .votedFor(context.getTermInformation().getVotedFor()) - .peerAddresses(new HashMap<>(context.getPeerAddresses())); + .isVoting(context.isVotingMember()) + .peerAddresses(peerAddresses) + .peerVotingStates(peerVotingStates) + .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass()); - ReplicatedLogEntry lastLogEntry = getLastLogEntry(); + ReplicatedLogEntry lastLogEntry = replicatedLog().last(); if (lastLogEntry != null) { builder.lastLogIndex(lastLogEntry.getIndex()); builder.lastLogTerm(lastLogEntry.getTerm()); @@ -306,7 +449,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { for(String id: followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), - info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()))); + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()), + context.getPeerInfo(info.getId()).isVoting())); } builder.followerInfoList(followerInfoList); @@ -316,26 +460,32 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } - private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) { + private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) { RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); if (oldBehavior != currentBehavior){ onStateChanged(); } - String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId(); + String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId(); String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name(); // it can happen that the state has not changed but the leader has changed. Optional roleChangeNotifier = getRoleChangeNotifier(); - if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId()) || + if(!Objects.equals(lastValidLeaderId, currentBehavior.getLeaderId()) || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { if(roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), currentBehavior.getLeaderPayloadVersion()), getSelf()); } - onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId()); + onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId()); + + if(leadershipTransferInProgress != null) { + leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); + } + + serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId()); } if (roleChangeNotifier.isPresent() && @@ -367,8 +517,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param identifier * @param data */ - protected void persistData(final ActorRef clientActor, final String identifier, - final Payload data) { + protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( context.getReplicatedLog().lastIndex() + 1, @@ -380,28 +529,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { final RaftActorContext raftContext = getRaftActorContext(); - replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure() { - @Override - public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception { - if (!hasFollowers()){ - // Increment the Commit Index and the Last Applied values - raftContext.setCommitIndex(replicatedLogEntry.getIndex()); - raftContext.setLastApplied(replicatedLogEntry.getIndex()); + replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> { + if (!hasFollowers()){ + // Increment the Commit Index and the Last Applied values + raftContext.setCommitIndex(replicatedLogEntry1.getIndex()); + raftContext.setLastApplied(replicatedLogEntry1.getIndex()); - // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self()); + // Apply the state immediately. + self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self()); - // Send a ApplyJournalEntries message so that we write the fact that we applied - // the state to durable storage - self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self()); + // Send a ApplyJournalEntries message so that we write the fact that we applied + // the state to durable storage + self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self()); - } else if (clientActor != null) { - context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); + } else if (clientActor != null) { + context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1); - // Send message for replication - currentBehavior.handleMessage(getSelf(), - new Replicate(clientActor, identifier, replicatedLogEntry)); - } + // Send message for replication + getCurrentBehavior().handleMessage(getSelf(), + new Replicate(clientActor, identifier, replicatedLogEntry1)); } }); } @@ -416,11 +562,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @VisibleForTesting void setCurrentBehavior(RaftActorBehavior behavior) { - currentBehavior.setDelegate(behavior); + context.setCurrentBehavior(behavior); } protected RaftActorBehavior getCurrentBehavior() { - return currentBehavior.getDelegate(); + return context.getCurrentBehavior(); } /** @@ -430,7 +576,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @return true it this RaftActor is a Leader false otherwise */ protected boolean isLeader() { - return context.getId().equals(currentBehavior.getLeaderId()); + return context.getId().equals(getCurrentBehavior().getLeaderId()); + } + + protected final boolean isLeaderActive() { + return getRaftState() != RaftState.IsolatedLeader && !shuttingDown && + !isLeadershipTransferInProgress(); + } + + private boolean isLeadershipTransferInProgress() { + return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring(); } /** @@ -454,36 +609,50 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * * @return the current leader's id */ - protected String getLeaderId(){ - return currentBehavior.getLeaderId(); + protected final String getLeaderId(){ + return getCurrentBehavior().getLeaderId(); } - protected RaftState getRaftState() { - return currentBehavior.state(); - } - - protected ReplicatedLogEntry getLastLogEntry() { - return replicatedLog().last(); + @VisibleForTesting + protected final RaftState getRaftState() { + return getCurrentBehavior().state(); } protected Long getCurrentTerm(){ return context.getTermInformation().getCurrentTerm(); } - protected Long getCommitIndex(){ - return context.getCommitIndex(); - } - - protected Long getLastApplied(){ - return context.getLastApplied(); - } - protected RaftActorContext getRaftActorContext() { return context; } protected void updateConfigParams(ConfigParams configParams) { + + // obtain the RaftPolicy for oldConfigParams and the updated one. + String oldRaftPolicy = context.getConfigParams(). + getCustomRaftPolicyImplementationClass(); + String newRaftPolicy = configParams. + getCustomRaftPolicyImplementationClass(); + + LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(), + oldRaftPolicy, newRaftPolicy); context.setConfigParams(configParams); + if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) { + // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower + // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This + // avoids potential disruption. Otherwise, switch to Follower normally. + RaftActorBehavior behavior = getCurrentBehavior(); + if (behavior != null && behavior.state() == RaftState.Follower) { + String previousLeaderId = behavior.getLeaderId(); + short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion(); + + LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId); + + changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion)); + } else { + initializeBehavior(); + } + } } public final DataPersistenceProvider persistence() { @@ -556,8 +725,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param data A piece of data that was persisted by the persistData call. * This should NEVER be null. */ - protected abstract void applyState(ActorRef clientActor, String identifier, - Object data); + protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data); /** * Returns the RaftActorRecoveryCohort to participate in persistence recovery. @@ -589,13 +757,30 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ protected abstract Optional getRoleChangeNotifier(); - protected void onLeaderChanged(String oldLeader, String newLeader){}; + /** + * This method is called prior to operations such as leadership transfer and actor shutdown when the leader + * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current + * work prior to performing the operation. On completion of any work, the run method must be called on the + * given Runnable to proceed with the given operation. Important: the run method must be called on + * this actor's thread dispatcher as as it modifies internal state. + *

+ * The default implementation immediately runs the operation. + * + * @param operation the operation to run + */ + protected void pauseLeader(Runnable operation) { + operation.run(); + } + + protected void onLeaderChanged(String oldLeader, String newLeader) { + + }; private String getLeaderAddress(){ if(isLeader()){ return getSelf().path().toString(); } - String leaderId = currentBehavior.getLeaderId(); + String leaderId = getLeaderId(); if (leaderId == null) { return null; } @@ -615,11 +800,42 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void captureSnapshot() { SnapshotManager snapshotManager = context.getSnapshotManager(); - if(!snapshotManager.isCapturing()) { + if (!snapshotManager.isCapturing()) { + final long idx = getCurrentBehavior().getReplicatedToAllIndex(); LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", - replicatedLog().last(), currentBehavior.getReplicatedToAllIndex()); + replicatedLog().last(), idx); + + snapshotManager.capture(replicatedLog().last(), idx); + } + } + + /** + * Switch this member to non-voting status. This is a no-op for all behaviors except when we are the leader, + * in which case we need to step down. + */ + void becomeNonVoting() { + if (isLeader()) { + initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { + @Override + public void onSuccess(ActorRef raftActorRef) { + LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId()); + ensureFollowerState(); + } - snapshotManager.capture(replicatedLog().last(), currentBehavior.getReplicatedToAllIndex()); + @Override + public void onFailure(ActorRef raftActorRef) { + LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId()); + ensureFollowerState(); + } + + private void ensureFollowerState() { + // Whether or not leadership transfer succeeded, we have to step down as leader and + // switch to Follower so ensure that. + if (getRaftState() != RaftState.Follower) { + initializeBehavior(); + } + } + }); } } @@ -668,46 +884,88 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private static class BehaviorStateHolder { - private RaftActorBehavior behavior; - private String leaderId; - private short leaderPayloadVersion; + /** + * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors. + */ + private static abstract class BehaviorState implements Immutable { + @Nullable abstract RaftActorBehavior getBehavior(); + @Nullable abstract String getLastValidLeaderId(); + @Nullable abstract short getLeaderPayloadVersion(); + } - void init(RaftActorBehavior behavior) { - this.behavior = behavior; - this.leaderId = behavior != null ? behavior.getLeaderId() : null; - this.leaderPayloadVersion = behavior != null ? behavior.getLeaderPayloadVersion() : -1; + /** + * A {@link BehaviorState} corresponding to non-null {@link RaftActorBehavior} state. + */ + private static final class SimpleBehaviorState extends BehaviorState { + private final RaftActorBehavior behavior; + private final String lastValidLeaderId; + private final short leaderPayloadVersion; + + SimpleBehaviorState(final String lastValidLeaderId, final RaftActorBehavior behavior) { + this.lastValidLeaderId = lastValidLeaderId; + this.behavior = Preconditions.checkNotNull(behavior); + this.leaderPayloadVersion = behavior.getLeaderPayloadVersion(); } + @Override RaftActorBehavior getBehavior() { return behavior; } - String getLeaderId() { - return leaderId; + @Override + String getLastValidLeaderId() { + return lastValidLeaderId; } + @Override short getLeaderPayloadVersion() { return leaderPayloadVersion; } } - private class SwitchBehaviorSupplier implements Supplier { - private Object message; - private ActorRef sender; + /** + * Class tracking behavior-related information, which we need to keep around and pass across behavior switches. + * An instance is created for each RaftActor. It has two functions: + * - it keeps track of the last leader ID we have encountered since we have been created + * - it creates state capture needed to transition from one behavior to the next + */ + private static final class BehaviorStateTracker { + /** + * A {@link BehaviorState} corresponding to null {@link RaftActorBehavior} state. Since null behavior is only + * allowed before we receive the first message, we know the leader ID to be null. + */ + private static final BehaviorState NULL_BEHAVIOR_STATE = new BehaviorState() { + @Override + RaftActorBehavior getBehavior() { + return null; + } - public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){ - this.sender = sender; - this.message = message; - return this; - } + @Override + String getLastValidLeaderId() { + return null; + } - @Override - public RaftActorBehavior get() { - if(this.message instanceof SwitchBehavior){ - return ((SwitchBehavior) message).getNewState().createBehavior(getRaftActorContext()); + @Override + short getLeaderPayloadVersion() { + return -1; + } + }; + + private String lastValidLeaderId; + + BehaviorState capture(final RaftActorBehavior behavior) { + if (behavior == null) { + Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader"); + return NULL_BEHAVIOR_STATE; } - return currentBehavior.handleMessage(sender, message); + + final String leaderId = behavior.getLeaderId(); + if (leaderId != null) { + lastValidLeaderId = leaderId; + } + + return new SimpleBehaviorState(lastValidLeaderId, behavior); } } + }