X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=57eb2647c9ebbc3ab345b93b16bcbfde44a65b1c;hb=refs%2Fchanges%2F84%2F46084%2F3;hp=945af8d47d0415200d41462cc15bb7877a113a2a;hpb=d639c62a1e743ec846b15bf755b830b00f4cc7eb;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 945af8d47d..57eb2647c9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -12,19 +12,19 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; -import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Optional; -import com.google.common.base.Supplier; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.collect.Lists; -import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; @@ -33,7 +33,6 @@ import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; @@ -49,7 +48,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.Immutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,14 +112,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final PersistentDataProvider persistentProvider; + private final BehaviorStateTracker behaviorStateTracker = new BehaviorStateTracker(); + private RaftActorRecoverySupport raftRecovery; private RaftActorSnapshotMessageSupport snapshotSupport; - private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); - - private final SwitchBehaviorSupplier reusableSwitchBehaviorSupplier = new SwitchBehaviorSupplier(); - private RaftActorServerConfigurationSupport serverConfigurationSupport; private RaftActorLeadershipTransferCohort leadershipTransferInProgress; @@ -132,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, - (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), + configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(), delegatingPersistenceProvider, LOG); context.setPayloadVersion(payloadVersion); @@ -169,13 +170,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { initializeBehavior(); raftRecovery = null; - - if (context.getReplicatedLog().size() > 0) { - self().tell(new InitiateCaptureSnapshot(), self()); - LOG.info("{}: Snapshot capture initiated after recovery", persistenceId()); - } else { - LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId()); - } } } @@ -189,21 +183,39 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @VisibleForTesting - protected void changeCurrentBehavior(RaftActorBehavior newBehavior){ - if(getCurrentBehavior() != null) { + protected void changeCurrentBehavior(RaftActorBehavior newBehavior) { + final RaftActorBehavior currentBehavior = getCurrentBehavior(); + if (currentBehavior != null) { try { - getCurrentBehavior().close(); - } catch(Exception e) { - LOG.warn("{}: Error closing behavior {}", persistence(), getCurrentBehavior(), e); + currentBehavior.close(); + } catch (Exception e) { + LOG.warn("{}: Error closing behavior {}", persistence(), currentBehavior, e); } } - reusableBehaviorStateHolder.init(getCurrentBehavior()); + final BehaviorState state = behaviorStateTracker.capture(currentBehavior); setCurrentBehavior(newBehavior); - handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); + handleBehaviorChange(state, newBehavior); } + /** + * Method exposed for subclasses to plug-in their logic. This method is invoked by {@link #handleCommand(Object)} + * for messages which are not handled by this class. Subclasses overriding this class should fall back to this + * implementation for messages which they do not handle + * + * @param message Incoming command message + */ + protected void handleNonRaftCommand(final Object message) { + unhandled(message); + } + + /** + * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override + * {@link #handleNonRaftCommand(Object)} instead. + */ + @Deprecated @Override + // FIXME: make this method final once our unit tests do not need to override it protected void handleCommand(final Object message) { if (serverConfigurationSupport.handleMessage(message, getSender())) { return; @@ -215,11 +227,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (message instanceof ApplyState) { ApplyState applyState = (ApplyState) message; - long elapsedTime = (System.nanoTime() - applyState.getStartTime()); - if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ - LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", - TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); - } + long startTime = System.nanoTime(); if(LOG.isDebugEnabled()) { LOG.debug("{}: Applying state for log index {} data {}", @@ -227,8 +235,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { applyState.getReplicatedLogEntry().getData()); } - applyState(applyState.getClientActor(), applyState.getIdentifier(), - applyState.getReplicatedLogEntry().getData()); + if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) { + applyState(applyState.getClientActor(), applyState.getIdentifier(), + applyState.getReplicatedLogEntry().getData()); + } + + long elapsedTime = System.nanoTime() - startTime; + if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ + LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", + TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); + } if (!hasFollowers()) { // for single node, the capture should happen after the apply state @@ -240,6 +256,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getSnapshotManager().trimLog(context.getLastApplied()); } + // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState. + possiblyHandleBehaviorMessage(message); + } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; if(LOG.isDebugEnabled()) { @@ -258,31 +277,49 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if(message instanceof InitiateCaptureSnapshot) { captureSnapshot(); } else if(message instanceof SwitchBehavior) { - switchBehavior(((SwitchBehavior) message)); + switchBehavior((SwitchBehavior) message); } else if(message instanceof LeaderTransitioning) { onLeaderTransitioning(); } else if(message instanceof Shutdown) { onShutDown(); } else if(message instanceof Runnable) { ((Runnable)message).run(); - } else { - switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); + } else if(message instanceof NoopPayload) { + persistData(null, null, (NoopPayload)message); + } else if (!possiblyHandleBehaviorMessage(message)) { + handleNonRaftCommand(message); } } + private boolean possiblyHandleBehaviorMessage(final Object message) { + final RaftActorBehavior currentBehavior = getCurrentBehavior(); + final BehaviorState state = behaviorStateTracker.capture(currentBehavior); + + // A behavior indicates that it processed the change by returning a reference to the next behavior + // to be used. A null return indicates it has not processed the message and we should be passing it to + // the subclass for handling. + final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message); + if (nextBehavior != null) { + switchBehavior(state, nextBehavior); + return true; + } + + return false; + } + private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { LOG.debug("{}: Initiating leader transfer", persistenceId()); if(leadershipTransferInProgress == null) { - leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender()); + leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { + public void onSuccess(ActorRef raftActorRef) { leadershipTransferInProgress = null; } @Override - public void onFailure(ActorRef raftActorRef, ActorRef replyTo) { + public void onFailure(ActorRef raftActorRef) { leadershipTransferInProgress = null; } }); @@ -305,21 +342,27 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { shuttingDown = true; final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) { + if (currentBehavior.state() != RaftState.Leader) { + // For non-leaders shutdown is a no-op + self().tell(PoisonPill.getInstance(), self()); + return; + } + + if (context.hasFollowers()) { initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { + public void onSuccess(ActorRef raftActorRef) { LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } @Override - public void onFailure(ActorRef raftActorRef, ActorRef replyTo) { + public void onFailure(ActorRef raftActorRef) { LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } }); - } else if(currentBehavior.state() == RaftState.Leader) { + } else { pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) { @Override protected void doRun() { @@ -331,8 +374,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { self().tell(PoisonPill.getInstance(), self()); } }); - } else { - self().tell(PoisonPill.getInstance(), self()); } } @@ -349,7 +390,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { RaftState newState = message.getNewState(); if( newState == RaftState.Leader || newState == RaftState.Follower) { - switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message)); + switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()), + AbstractRaftActorBehavior.createBehavior(context, message.getNewState())); getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); } else { LOG.warn("Switching to behavior : {} - not supported", newState); @@ -357,12 +399,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void switchBehavior(Supplier supplier){ - reusableBehaviorStateHolder.init(getCurrentBehavior()); - - setCurrentBehavior(supplier.get()); - - handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); + private void switchBehavior(final BehaviorState oldBehaviorState, final RaftActorBehavior nextBehavior) { + setCurrentBehavior(nextBehavior); + handleBehaviorChange(oldBehaviorState, nextBehavior); } @VisibleForTesting @@ -374,8 +413,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // Debugging message to retrieve raft stats. Map peerAddresses = new HashMap<>(); - for(String peerId: context.getPeerIds()) { - peerAddresses.put(peerId, context.getPeerAddress(peerId)); + Map peerVotingStates = new HashMap<>(); + for(PeerInfo info: context.getPeers()) { + peerVotingStates.put(info.getId(), info.isVoting()); + peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : ""); } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); @@ -394,7 +435,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .snapshotIndex(replicatedLog().getSnapshotIndex()) .snapshotTerm(replicatedLog().getSnapshotTerm()) .votedFor(context.getTermInformation().getVotedFor()) + .isVoting(context.isVotingMember()) .peerAddresses(peerAddresses) + .peerVotingStates(peerVotingStates) .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass()); ReplicatedLogEntry lastLogEntry = replicatedLog().last(); @@ -410,7 +453,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { for(String id: followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), - info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()))); + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()), + context.getPeerInfo(info.getId()).isVoting())); } builder.followerInfoList(followerInfoList); @@ -420,19 +464,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } - private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) { + private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) { RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); if (oldBehavior != currentBehavior){ onStateChanged(); } + String lastLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastLeaderId(); String lastValidLeaderId = oldBehavior == null ? null : oldBehaviorState.getLastValidLeaderId(); String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name(); // it can happen that the state has not changed but the leader has changed. Optional roleChangeNotifier = getRoleChangeNotifier(); - if(!Objects.equal(lastValidLeaderId, currentBehavior.getLeaderId()) || + if(!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { if(roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), @@ -449,7 +494,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } if (roleChangeNotifier.isPresent() && - (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) { + (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , currentBehavior.state().name()), getSelf()); } @@ -477,8 +522,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param identifier * @param data */ - protected void persistData(final ActorRef clientActor, final String identifier, - final Payload data) { + protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( context.getReplicatedLog().lastIndex() + 1, @@ -490,28 +534,25 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { final RaftActorContext raftContext = getRaftActorContext(); - replicatedLog().appendAndPersist(replicatedLogEntry, new Procedure() { - @Override - public void apply(ReplicatedLogEntry replicatedLogEntry) { - if (!hasFollowers()){ - // Increment the Commit Index and the Last Applied values - raftContext.setCommitIndex(replicatedLogEntry.getIndex()); - raftContext.setLastApplied(replicatedLogEntry.getIndex()); + replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> { + if (!hasFollowers()){ + // Increment the Commit Index and the Last Applied values + raftContext.setCommitIndex(replicatedLogEntry1.getIndex()); + raftContext.setLastApplied(replicatedLogEntry1.getIndex()); - // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self()); + // Apply the state immediately. + self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self()); - // Send a ApplyJournalEntries message so that we write the fact that we applied - // the state to durable storage - self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self()); + // Send a ApplyJournalEntries message so that we write the fact that we applied + // the state to durable storage + self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self()); - } else if (clientActor != null) { - context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); + } else { + context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1); - // Send message for replication - getCurrentBehavior().handleMessage(getSelf(), - new Replicate(clientActor, identifier, replicatedLogEntry)); - } + // Send message for replication + getCurrentBehavior().handleMessage(getSelf(), + new Replicate(clientActor, identifier, replicatedLogEntry1)); } }); } @@ -544,8 +585,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected final boolean isLeaderActive() { - return getRaftState() != RaftState.IsolatedLeader && !shuttingDown && - !isLeadershipTransferInProgress(); + return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader && + !shuttingDown && !isLeadershipTransferInProgress(); } private boolean isLeadershipTransferInProgress() { @@ -559,7 +600,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * * @return A reference to the leader if known, null otherwise */ - protected ActorSelection getLeader(){ + public ActorSelection getLeader(){ String leaderAddress = getLeaderAddress(); if(leaderAddress == null){ @@ -601,13 +642,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(), oldRaftPolicy, newRaftPolicy); context.setConfigParams(configParams); - if (!Objects.equal(oldRaftPolicy, newRaftPolicy)) { + if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) { // The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower // but transfer the previous leaderId so it doesn't immediately try to schedule an election. This // avoids potential disruption. Otherwise, switch to Follower normally. RaftActorBehavior behavior = getCurrentBehavior(); - if(behavior instanceof Follower) { - String previousLeaderId = ((Follower)behavior).getLeaderId(); + if (behavior != null && behavior.state() == RaftState.Follower) { + String previousLeaderId = behavior.getLeaderId(); short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion(); LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId); @@ -628,9 +669,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void setPersistence(boolean persistent) { - if(persistent) { + DataPersistenceProvider currentPersistence = persistence(); + if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) { setPersistence(new PersistentDataProvider(this)); - } else { + + if(getCurrentBehavior() != null) { + LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId()); + captureSnapshot(); + } + } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { setPersistence(new NonPersistentDataProvider() { /** * The way snapshotting works is, @@ -689,8 +736,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param data A piece of data that was persisted by the persistData call. * This should NEVER be null. */ - protected abstract void applyState(ActorRef clientActor, String identifier, - Object data); + protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data); /** * Returns the RaftActorRecoveryCohort to participate in persistence recovery. @@ -739,7 +785,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected void onLeaderChanged(String oldLeader, String newLeader) { - }; + } private String getLeaderAddress(){ if(isLeader()){ @@ -782,13 +828,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (isLeader()) { initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) { + public void onSuccess(ActorRef raftActorRef) { LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId()); ensureFollowerState(); } @Override - public void onFailure(ActorRef raftActorRef, ActorRef replyTo) { + public void onFailure(ActorRef raftActorRef) { LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId()); ensureFollowerState(); } @@ -805,94 +851,102 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } /** - * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries} - * whose type for fromIndex is long instead of int. This class was kept for backwards - * compatibility with Helium. + * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors. */ - // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility. - @SuppressWarnings("serial") - @Deprecated - static class DeleteEntries implements Serializable { - private final int fromIndex; - - public DeleteEntries(int fromIndex) { - this.fromIndex = fromIndex; - } - - public int getFromIndex() { - return fromIndex; - } + private static abstract class BehaviorState implements Immutable { + @Nullable abstract RaftActorBehavior getBehavior(); + @Nullable abstract String getLastValidLeaderId(); + @Nullable abstract String getLastLeaderId(); + @Nullable abstract short getLeaderPayloadVersion(); } /** - * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm} - * which has serialVersionUID set. This class was kept for backwards compatibility with Helium. + * A {@link BehaviorState} corresponding to non-null {@link RaftActorBehavior} state. */ - // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility. - @SuppressWarnings("serial") - @Deprecated - static class UpdateElectionTerm implements Serializable { - private final long currentTerm; - private final String votedFor; - - public UpdateElectionTerm(long currentTerm, String votedFor) { - this.currentTerm = currentTerm; - this.votedFor = votedFor; - } - - public long getCurrentTerm() { - return currentTerm; - } + private static final class SimpleBehaviorState extends BehaviorState { + private final RaftActorBehavior behavior; + private final String lastValidLeaderId; + private final String lastLeaderId; + private final short leaderPayloadVersion; - public String getVotedFor() { - return votedFor; - } - } - - private static class BehaviorStateHolder { - private RaftActorBehavior behavior; - private String lastValidLeaderId; - private short leaderPayloadVersion; - - void init(RaftActorBehavior behavior) { - this.behavior = behavior; - this.leaderPayloadVersion = behavior != null ? behavior.getLeaderPayloadVersion() : -1; - - String behaviorLeaderId = behavior != null ? behavior.getLeaderId() : null; - if(behaviorLeaderId != null) { - this.lastValidLeaderId = behaviorLeaderId; - } + SimpleBehaviorState(final String lastValidLeaderId, final String lastLeaderId, + final RaftActorBehavior behavior) { + this.lastValidLeaderId = lastValidLeaderId; + this.lastLeaderId = lastLeaderId; + this.behavior = Preconditions.checkNotNull(behavior); + this.leaderPayloadVersion = behavior.getLeaderPayloadVersion(); } + @Override RaftActorBehavior getBehavior() { return behavior; } + @Override String getLastValidLeaderId() { return lastValidLeaderId; } + @Override short getLeaderPayloadVersion() { return leaderPayloadVersion; } + + @Override + String getLastLeaderId() { + return lastLeaderId; + } } - private class SwitchBehaviorSupplier implements Supplier { - private Object message; - private ActorRef sender; + /** + * Class tracking behavior-related information, which we need to keep around and pass across behavior switches. + * An instance is created for each RaftActor. It has two functions: + * - it keeps track of the last leader ID we have encountered since we have been created + * - it creates state capture needed to transition from one behavior to the next + */ + private static final class BehaviorStateTracker { + /** + * A {@link BehaviorState} corresponding to null {@link RaftActorBehavior} state. Since null behavior is only + * allowed before we receive the first message, we know the leader ID to be null. + */ + private static final BehaviorState NULL_BEHAVIOR_STATE = new BehaviorState() { + @Override + RaftActorBehavior getBehavior() { + return null; + } - public SwitchBehaviorSupplier handleMessage(ActorRef sender, Object message){ - this.sender = sender; - this.message = message; - return this; - } + @Override + String getLastValidLeaderId() { + return null; + } - @Override - public RaftActorBehavior get() { - if(this.message instanceof SwitchBehavior){ - return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState()); + @Override + short getLeaderPayloadVersion() { + return -1; } - return getCurrentBehavior().handleMessage(sender, message); + + @Override + String getLastLeaderId() { + return null; + } + }; + + private String lastValidLeaderId; + private String lastLeaderId; + + BehaviorState capture(final RaftActorBehavior behavior) { + if (behavior == null) { + Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader"); + return NULL_BEHAVIOR_STATE; + } + + lastLeaderId = behavior.getLeaderId(); + if (lastLeaderId != null) { + lastValidLeaderId = lastLeaderId; + } + + return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior); } } + }