() {
- @Override
- public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
- if(!hasFollowers()){
- // Increment the Commit Index and the Last Applied values
- raftContext.setCommitIndex(replicatedLogEntry.getIndex());
- raftContext.setLastApplied(replicatedLogEntry.getIndex());
+ boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+ // Clear the persistence pending flag in the log entry.
+ persistedLogEntry.setPersistencePending(false);
- // Apply the state immediately
- applyState(clientActor, identifier, data);
+ if (!hasFollowers()) {
+ // Increment the Commit Index and the Last Applied values
+ raftContext.setCommitIndex(persistedLogEntry.getIndex());
+ raftContext.setLastApplied(persistedLogEntry.getIndex());
- // Send a ApplyJournalEntries message so that we write the fact that we applied
- // the state to durable storage
- self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
+ // Apply the state immediately.
+ handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry));
- context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+ // Send a ApplyJournalEntries message so that we write the fact that we applied
+ // the state to durable storage
+ self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
- } else if (clientActor != null) {
- // Send message for replication
- currentBehavior.handleMessage(getSelf(),
- new Replicate(clientActor, identifier,
- replicatedLogEntry)
- );
- }
+ } else {
+ context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
- }
- }); }
+ // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+ // normally should still be the leader) to check if consensus has now been reached in conjunction with
+ // follower replication.
+ getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
+ }
+ }, true);
+
+ if (wasAppended && hasFollowers()) {
+ // Send log entry for replication.
+ getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry,
+ !batchHint));
+ }
+ }
+
+ private ReplicatedLog replicatedLog() {
+ return context.getReplicatedLog();
+ }
protected String getId() {
return context.getId();
}
+ @VisibleForTesting
+ void setCurrentBehavior(final RaftActorBehavior behavior) {
+ context.setCurrentBehavior(behavior);
+ }
+
+ protected RaftActorBehavior getCurrentBehavior() {
+ return context.getCurrentBehavior();
+ }
+
/**
* Derived actors can call the isLeader method to check if the current
- * RaftActor is the Leader or not
+ * RaftActor is the Leader or not.
*
* @return true it this RaftActor is a Leader false otherwise
*/
protected boolean isLeader() {
- return context.getId().equals(currentBehavior.getLeaderId());
+ return context.getId().equals(getCurrentBehavior().getLeaderId());
+ }
+
+ protected final boolean isLeaderActive() {
+ return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader
+ && !shuttingDown && !isLeadershipTransferInProgress();
+ }
+
+ protected boolean isLeadershipTransferInProgress() {
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
+ return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
}
/**
@@ -541,10 +674,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
*
* @return A reference to the leader if known, null otherwise
*/
- protected ActorSelection getLeader(){
+ public ActorSelection getLeader() {
String leaderAddress = getLeaderAddress();
- if(leaderAddress == null){
+ if (leaderAddress == null) {
return null;
}
@@ -552,66 +685,116 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
}
/**
+ * Returns the id of the current leader.
*
* @return the current leader's id
*/
- protected String getLeaderId(){
- return currentBehavior.getLeaderId();
+ protected final String getLeaderId() {
+ return getCurrentBehavior().getLeaderId();
}
- protected RaftState getRaftState() {
- return currentBehavior.state();
+ @VisibleForTesting
+ protected final RaftState getRaftState() {
+ return getCurrentBehavior().state();
}
- protected ReplicatedLogEntry getLastLogEntry() {
- return replicatedLog.last();
+ protected Long getCurrentTerm() {
+ return context.getTermInformation().getCurrentTerm();
}
- protected Long getCurrentTerm(){
- return context.getTermInformation().getCurrentTerm();
+ protected RaftActorContext getRaftActorContext() {
+ return context;
}
- protected Long getCommitIndex(){
- return context.getCommitIndex();
+ protected void updateConfigParams(final ConfigParams configParams) {
+
+ // obtain the RaftPolicy for oldConfigParams and the updated one.
+ String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
+ String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass();
+
+ LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
+ oldRaftPolicy, newRaftPolicy);
+ context.setConfigParams(configParams);
+ if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) {
+ // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
+ // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
+ // avoids potential disruption. Otherwise, switch to Follower normally.
+ RaftActorBehavior behavior = getCurrentBehavior();
+ if (behavior != null && behavior.state() == RaftState.Follower) {
+ String previousLeaderId = behavior.getLeaderId();
+ short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
+
+ LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(),
+ previousLeaderId);
+
+ changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
+ } else {
+ initializeBehavior();
+ }
+ }
}
- protected Long getLastApplied(){
- return context.getLastApplied();
+ public final DataPersistenceProvider persistence() {
+ return delegatingPersistenceProvider.getDelegate();
}
- protected RaftActorContext getRaftActorContext() {
- return context;
+ public void setPersistence(final DataPersistenceProvider provider) {
+ delegatingPersistenceProvider.setDelegate(provider);
}
- protected void updateConfigParams(ConfigParams configParams) {
- context.setConfigParams(configParams);
+ protected void setPersistence(final boolean persistent) {
+ DataPersistenceProvider currentPersistence = persistence();
+ if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) {
+ setPersistence(new PersistentDataProvider(this));
+
+ if (getCurrentBehavior() != null) {
+ LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+ captureSnapshot();
+ }
+ } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
+ setPersistence(new NonPersistentDataProvider(this) {
+ /*
+ * The way snapshotting works is,
+ *
+ * - RaftActor calls createSnapshot on the Shard
+ *
- Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
+ *
- When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
+ * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
+ * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
+ * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
+ *
+ */
+ @Override
+ public void saveSnapshot(final Object object) {
+ // Make saving Snapshot successful
+ // Committing the snapshot here would end up calling commit in the creating state which would
+ // be a state violation. That's why now we send a message to commit the snapshot.
+ self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
+ }
+ });
+ }
}
/**
* setPeerAddress sets the address of a known peer at a later time.
+ *
*
* This is to account for situations where a we know that a peer
* exists but we do not know an address up-front. This may also be used in
* situations where a known peer starts off in a different location and we
* need to change it's address
+ *
*
* Note that if the peerId does not match the list of peers passed to
* this actor during construction an IllegalStateException will be thrown.
- *
- * @param peerId
- * @param peerAddress
*/
- protected void setPeerAddress(String peerId, String peerAddress){
+ protected void setPeerAddress(final String peerId, final String peerAddress) {
context.setPeerAddress(peerId, peerAddress);
}
- protected void commitSnapshot(long sequenceNumber) {
- context.getSnapshotManager().commit(persistence(), sequenceNumber);
- }
-
/**
* The applyState method will be called by the RaftActor when some data
- * needs to be applied to the actor's state
+ * needs to be applied to the actor's state.
*
* @param clientActor A reference to the client who sent this message. This
* is the same reference that was passed to persistData
@@ -626,351 +809,234 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
* @param data A piece of data that was persisted by the persistData call.
* This should NEVER be null.
*/
- protected abstract void applyState(ActorRef clientActor, String identifier,
- Object data);
+ protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
/**
- * This method is called during recovery at the start of a batch of state entries. Derived
- * classes should perform any initialization needed to start a batch.
+ * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
- protected abstract void startLogRecoveryBatch(int maxBatchSize);
+ @Nonnull
+ protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
- * This method is called during recovery to append state data to the current batch. This method
- * is called 1 or more times after {@link #startLogRecoveryBatch}.
- *
- * @param data the state data
+ * This method is called when recovery is complete.
*/
- protected abstract void appendRecoveredLogEntry(Payload data);
+ protected abstract void onRecoveryComplete();
/**
- * This method is called during recovery to reconstruct the state of the actor.
- *
- * @param snapshotBytes A snapshot of the state of the actor
+ * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
*/
- protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
+ @Nonnull
+ protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
/**
- * This method is called during recovery at the end of a batch to apply the current batched
- * log entries. This method is called after {@link #appendRecoveredLogEntry}.
+ * This method will be called by the RaftActor when the state of the
+ * RaftActor changes. The derived actor can then use methods like
+ * isLeader or getLeader to do something useful
*/
- protected abstract void applyCurrentLogRecoveryBatch();
+ protected abstract void onStateChanged();
/**
- * This method is called when recovery is complete.
+ * Notifier Actor for this RaftActor to notify when a role change happens.
+ *
+ * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
*/
- protected abstract void onRecoveryComplete();
+ protected abstract Optional getRoleChangeNotifier();
/**
- * This method will be called by the RaftActor when a snapshot needs to be
- * created. The derived actor should respond with its current state.
- *
- * During recovery the state that is returned by the derived actor will
- * be passed back to it by calling the applySnapshot method
- *
- * @return The current state of the actor
+ * This method is called on the leader when a voting change operation completes.
*/
- protected abstract void createSnapshot();
+ protected void onVotingStateChangeComplete() {
+ }
/**
- * This method can be called at any other point during normal
- * operations when the derived actor is out of sync with it's peers
- * and the only way to bring it in sync is by applying a snapshot
+ * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
+ * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
+ * work prior to performing the operation. On completion of any work, the run method must be called on the
+ * given Runnable to proceed with the given operation. Important: the run method must be called on
+ * this actor's thread dispatcher as as it modifies internal state.
*
- * @param snapshotBytes A snapshot of the state of the actor
+ *
+ * The default implementation immediately runs the operation.
+ *
+ * @param operation the operation to run
*/
- protected abstract void applySnapshot(byte[] snapshotBytes);
+ protected void pauseLeader(final Runnable operation) {
+ operation.run();
+ }
/**
- * This method will be called by the RaftActor when the state of the
- * RaftActor changes. The derived actor can then use methods like
- * isLeader or getLeader to do something useful
+ * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
+ * should resume normal operations.
+ *
+ *
+ * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
*/
- protected abstract void onStateChanged();
+ protected void unpauseLeader() {
- protected abstract DataPersistenceProvider persistence();
-
- /**
- * Notifier Actor for this RaftActor to notify when a role change happens
- * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
- */
- protected abstract Optional getRoleChangeNotifier();
+ }
- protected void onLeaderChanged(String oldLeader, String newLeader){};
+ protected void onLeaderChanged(final String oldLeader, final String newLeader) {
+ }
- private String getLeaderAddress(){
- if(isLeader()){
+ private String getLeaderAddress() {
+ if (isLeader()) {
return getSelf().path().toString();
}
- String leaderId = currentBehavior.getLeaderId();
+ String leaderId = getLeaderId();
if (leaderId == null) {
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
- persistenceId(), leaderId, peerAddress);
- }
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
return peerAddress;
}
- private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
- context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
+ protected boolean hasFollowers() {
+ return getRaftActorContext().hasFollowers();
}
- protected long getTotalMemory() {
- return Runtime.getRuntime().totalMemory();
- }
+ private void captureSnapshot() {
+ SnapshotManager snapshotManager = context.getSnapshotManager();
- protected boolean hasFollowers(){
- return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
- }
-
- private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
- private static final int DATA_SIZE_DIVIDER = 5;
- private long dataSizeSinceLastSnapshot = 0L;
+ if (!snapshotManager.isCapturing()) {
+ final long idx = getCurrentBehavior().getReplicatedToAllIndex();
+ LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
+ replicatedLog().last(), idx);
-
- public ReplicatedLogImpl(Snapshot snapshot) {
- super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
- snapshot.getUnAppliedEntries());
+ snapshotManager.capture(replicatedLog().last(), idx);
}
+ }
- public ReplicatedLogImpl() {
- super();
- }
-
- @Override public void removeFromAndPersist(long logEntryIndex) {
- int adjustedIndex = adjustedIndex(logEntryIndex);
-
- if (adjustedIndex < 0) {
- return;
- }
-
- // FIXME: Maybe this should be done after the command is saved
- journal.subList(adjustedIndex , journal.size()).clear();
-
- persistence().persist(new DeleteEntries(adjustedIndex), new Procedure() {
-
+ /**
+ * Switch this member to non-voting status. This is a no-op for all behaviors except when we are the leader,
+ * in which case we need to step down.
+ */
+ void becomeNonVoting() {
+ if (isLeader()) {
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void apply(DeleteEntries param)
- throws Exception {
- //FIXME : Doing nothing for now
- dataSize = 0;
- for (ReplicatedLogEntry entry : journal) {
- dataSize += entry.size();
- }
+ public void onSuccess(final ActorRef raftActorRef) {
+ LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
+ ensureFollowerState();
}
- });
- }
-
- @Override public void appendAndPersist(
- final ReplicatedLogEntry replicatedLogEntry) {
- appendAndPersist(replicatedLogEntry, null);
- }
- public void appendAndPersist(
- final ReplicatedLogEntry replicatedLogEntry,
- final Procedure callback) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
- }
+ @Override
+ public void onFailure(final ActorRef raftActorRef) {
+ LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
+ ensureFollowerState();
+ }
- // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
- journal.add(replicatedLogEntry);
-
- // When persisting events with persist it is guaranteed that the
- // persistent actor will not receive further commands between the
- // persist call and the execution(s) of the associated event
- // handler. This also holds for multiple persist calls in context
- // of a single command.
- persistence().persist(replicatedLogEntry,
- new Procedure() {
- @Override
- public void apply(ReplicatedLogEntry evt) throws Exception {
- int logEntrySize = replicatedLogEntry.size();
-
- dataSize += logEntrySize;
- long dataSizeForCheck = dataSize;
-
- dataSizeSinceLastSnapshot += logEntrySize;
-
- if (!hasFollowers()) {
- // When we do not have followers we do not maintain an in-memory log
- // due to this the journalSize will never become anything close to the
- // snapshot batch count. In fact will mostly be 1.
- // Similarly since the journal's dataSize depends on the entries in the
- // journal the journal's dataSize will never reach a value close to the
- // memory threshold.
- // By maintaining the dataSize outside the journal we are tracking essentially
- // what we have written to the disk however since we no longer are in
- // need of doing a snapshot just for the sake of freeing up memory we adjust
- // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
- // as if we were maintaining a real snapshot
- dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
- }
- long journalSize = replicatedLogEntry.getIndex() + 1;
- long dataThreshold = getTotalMemory() *
- context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
- if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
- || dataSizeForCheck > dataThreshold)) {
-
- boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
- currentBehavior.getReplicatedToAllIndex());
-
- if(started){
- dataSizeSinceLastSnapshot = 0;
- }
-
- }
-
- if (callback != null){
- callback.apply(replicatedLogEntry);
- }
+ private void ensureFollowerState() {
+ // Whether or not leadership transfer succeeded, we have to step down as leader and
+ // switch to Follower so ensure that.
+ if (getRaftState() != RaftState.Follower) {
+ initializeBehavior();
}
}
- );
+ }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
-
}
- static class DeleteEntries implements Serializable {
- private static final long serialVersionUID = 1L;
- private final int fromIndex;
-
- public DeleteEntries(int fromIndex) {
- this.fromIndex = fromIndex;
- }
+ /**
+ * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
+ */
+ private abstract static class BehaviorState implements Immutable {
+ @Nullable abstract RaftActorBehavior getBehavior();
- public int getFromIndex() {
- return fromIndex;
- }
- }
+ @Nullable abstract String getLastValidLeaderId();
+ @Nullable abstract String getLastLeaderId();
- private class ElectionTermImpl implements ElectionTerm {
- /**
- * Identifier of the actor whose election term information this is
- */
- private long currentTerm = 0;
- private String votedFor = null;
+ @Nullable abstract short getLeaderPayloadVersion();
+ }
- @Override
- public long getCurrentTerm() {
- return currentTerm;
+ /**
+ * A {@link BehaviorState} corresponding to non-null {@link RaftActorBehavior} state.
+ */
+ private static final class SimpleBehaviorState extends BehaviorState {
+ private final RaftActorBehavior behavior;
+ private final String lastValidLeaderId;
+ private final String lastLeaderId;
+ private final short leaderPayloadVersion;
+
+ SimpleBehaviorState(final String lastValidLeaderId, final String lastLeaderId,
+ final RaftActorBehavior behavior) {
+ this.lastValidLeaderId = lastValidLeaderId;
+ this.lastLeaderId = lastLeaderId;
+ this.behavior = Preconditions.checkNotNull(behavior);
+ this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
}
@Override
- public String getVotedFor() {
- return votedFor;
- }
-
- @Override public void update(long currentTerm, String votedFor) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
- }
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
+ RaftActorBehavior getBehavior() {
+ return behavior;
}
@Override
- public void updateAndPersist(long currentTerm, String votedFor){
- update(currentTerm, votedFor);
- // FIXME : Maybe first persist then update the state
- persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){
-
- @Override public void apply(UpdateElectionTerm param)
- throws Exception {
-
- }
- });
- }
- }
-
- static class UpdateElectionTerm implements Serializable {
- private static final long serialVersionUID = 1L;
- private final long currentTerm;
- private final String votedFor;
-
- public UpdateElectionTerm(long currentTerm, String votedFor) {
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
+ String getLastValidLeaderId() {
+ return lastValidLeaderId;
}
- public long getCurrentTerm() {
- return currentTerm;
+ @Override
+ short getLeaderPayloadVersion() {
+ return leaderPayloadVersion;
}
- public String getVotedFor() {
- return votedFor;
+ @Override
+ String getLastLeaderId() {
+ return lastLeaderId;
}
}
- protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
-
- public NonPersistentRaftDataProvider(){
-
- }
-
+ /**
+ * Class tracking behavior-related information, which we need to keep around and pass across behavior switches.
+ * An instance is created for each RaftActor. It has two functions:
+ * - it keeps track of the last leader ID we have encountered since we have been created
+ * - it creates state capture needed to transition from one behavior to the next
+ */
+ private static final class BehaviorStateTracker {
/**
- * The way snapshotting works is,
- *
- * - RaftActor calls createSnapshot on the Shard
- *
- Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
- *
- When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
- * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
- * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
- * in SaveSnapshotSuccess.
- *
- * @param o
+ * A {@link BehaviorState} corresponding to null {@link RaftActorBehavior} state. Since null behavior is only
+ * allowed before we receive the first message, we know the leader ID to be null.
*/
- @Override
- public void saveSnapshot(Object o) {
- // Make saving Snapshot successful
- // Committing the snapshot here would end up calling commit in the creating state which would
- // be a state violation. That's why now we send a message to commit the snapshot.
- self().tell(COMMIT_SNAPSHOT, self());
- }
- }
-
-
- private class CreateSnapshotProcedure implements Procedure {
+ private static final BehaviorState NULL_BEHAVIOR_STATE = new BehaviorState() {
+ @Override
+ RaftActorBehavior getBehavior() {
+ return null;
+ }
- @Override
- public void apply(Void aVoid) throws Exception {
- createSnapshot();
- }
- }
+ @Override
+ String getLastValidLeaderId() {
+ return null;
+ }
- @VisibleForTesting
- void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
- currentBehavior = behavior;
- }
+ @Override
+ short getLeaderPayloadVersion() {
+ return -1;
+ }
- protected RaftActorBehavior getCurrentBehavior() {
- return currentBehavior;
- }
+ @Override
+ String getLastLeaderId() {
+ return null;
+ }
+ };
- private static class BehaviorStateHolder {
- private RaftActorBehavior behavior;
- private String leaderId;
+ private String lastValidLeaderId;
+ private String lastLeaderId;
- void init(RaftActorBehavior behavior) {
- this.behavior = behavior;
- this.leaderId = behavior != null ? behavior.getLeaderId() : null;
- }
+ BehaviorState capture(final RaftActorBehavior behavior) {
+ if (behavior == null) {
+ Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
+ return NULL_BEHAVIOR_STATE;
+ }
- RaftActorBehavior getBehavior() {
- return behavior;
- }
+ lastLeaderId = behavior.getLeaderId();
+ if (lastLeaderId != null) {
+ lastValidLeaderId = lastLeaderId;
+ }
- String getLeaderId() {
- return leaderId;
+ return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
}
}
}