X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=cb07e59c5b9a44ad1c7fe778bf74de4743b576ea;hb=refs%2Fchanges%2F65%2F45865%2F4;hp=dce9ee8d596dccf6bc46b0f8b98bf4e5365570db;hpb=c42a5e91e0dcfc499b33a321ef45c0d310d366cc;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index dce9ee8d59..cb07e59c5b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -17,7 +17,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.collect.Lists; -import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -54,8 +53,6 @@ import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -101,11 +98,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis - protected final Logger LOG = LoggerFactory.getLogger(getClass()); - /** * This context should NOT be passed directly to any other actor it is - * only to be consumed by the RaftActorBehaviors + * only to be consumed by the RaftActorBehaviors. */ private final RaftActorContextImpl context; @@ -125,7 +120,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private boolean shuttingDown; - public RaftActor(String id, Map peerAddresses, + protected RaftActor(String id, Map peerAddresses, Optional configParams, short payloadVersion) { persistentProvider = new PersistentDataProvider(this); @@ -134,7 +129,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, - configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl(), + configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), delegatingPersistenceProvider, LOG); context.setPayloadVersion(payloadVersion); @@ -160,12 +155,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override protected void handleRecover(Object message) { - if(raftRecovery == null) { + if (raftRecovery == null) { raftRecovery = newRaftActorRecoverySupport(); } boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message, persistentProvider); - if(recoveryComplete) { + if (recoveryComplete) { onRecoveryComplete(); initializeBehavior(); @@ -179,7 +174,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @VisibleForTesting - void initializeBehavior(){ + void initializeBehavior() { changeCurrentBehavior(new Follower(context)); } @@ -230,7 +225,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { long startTime = System.nanoTime(); - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("{}: Applying state for log index {} data {}", persistenceId(), applyState.getReplicatedLogEntry().getIndex(), applyState.getReplicatedLogEntry().getData()); @@ -242,7 +237,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } long elapsedTime = System.nanoTime() - startTime; - if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){ + if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); } @@ -257,11 +252,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getSnapshotManager().trimLog(context.getLastApplied()); } + // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState. + possiblyHandleBehaviorMessage(message); + } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); - } + LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); persistence().persist(applyEntries, NoopProcedure.instance()); @@ -270,41 +266,45 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { new FindLeaderReply(getLeaderAddress()), getSelf() ); - } else if(message instanceof GetOnDemandRaftState) { + } else if (message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); - } else if(message instanceof InitiateCaptureSnapshot) { + } else if (message instanceof InitiateCaptureSnapshot) { captureSnapshot(); - } else if(message instanceof SwitchBehavior) { + } else if (message instanceof SwitchBehavior) { switchBehavior((SwitchBehavior) message); - } else if(message instanceof LeaderTransitioning) { + } else if (message instanceof LeaderTransitioning) { onLeaderTransitioning(); - } else if(message instanceof Shutdown) { + } else if (message instanceof Shutdown) { onShutDown(); - } else if(message instanceof Runnable) { + } else if (message instanceof Runnable) { ((Runnable)message).run(); - } else if(message instanceof NoopPayload) { + } else if (message instanceof NoopPayload) { persistData(null, null, (NoopPayload)message); - } else { - // Processing the message may affect the state, hence we need to capture it - final RaftActorBehavior currentBehavior = getCurrentBehavior(); - final BehaviorState state = behaviorStateTracker.capture(currentBehavior); - - // A behavior indicates that it processed the change by returning a reference to the next behavior - // to be used. A null return indicates it has not processed the message and we should be passing it to - // the subclass for handling. - final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message); - if (nextBehavior != null) { - switchBehavior(state, nextBehavior); - } else { - handleNonRaftCommand(message); - } + } else if (!possiblyHandleBehaviorMessage(message)) { + handleNonRaftCommand(message); } } + private boolean possiblyHandleBehaviorMessage(final Object message) { + final RaftActorBehavior currentBehavior = getCurrentBehavior(); + final BehaviorState state = behaviorStateTracker.capture(currentBehavior); + + // A behavior indicates that it processed the change by returning a reference to the next behavior + // to be used. A null return indicates it has not processed the message and we should be passing it to + // the subclass for handling. + final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message); + if (nextBehavior != null) { + switchBehavior(state, nextBehavior); + return true; + } + + return false; + } + private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { LOG.debug("{}: Initiating leader transfer", persistenceId()); - if(leadershipTransferInProgress == null) { + if (leadershipTransferInProgress == null) { leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override @@ -329,7 +329,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onShutDown() { LOG.debug("{}: onShutDown", persistenceId()); - if(shuttingDown) { + if (shuttingDown) { return; } @@ -374,16 +374,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void onLeaderTransitioning() { LOG.debug("{}: onLeaderTransitioning", persistenceId()); Optional roleChangeNotifier = getRoleChangeNotifier(); - if(getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { + if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } } private void switchBehavior(SwitchBehavior message) { - if(!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { + if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { RaftState newState = message.getNewState(); - if( newState == RaftState.Leader || newState == RaftState.Follower) { + if ( newState == RaftState.Leader || newState == RaftState.Follower) { switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()), AbstractRaftActorBehavior.createBehavior(context, message.getNewState())); getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); @@ -408,7 +408,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Map peerAddresses = new HashMap<>(); Map peerVotingStates = new HashMap<>(); - for(PeerInfo info: context.getPeers()) { + for (PeerInfo info: context.getPeers()) { peerVotingStates.put(info.getId(), info.isVoting()); peerAddresses.put(info.getId(), info.getAddress() != null ? info.getAddress() : ""); } @@ -440,11 +440,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { builder.lastLogTerm(lastLogEntry.getTerm()); } - if(getCurrentBehavior() instanceof AbstractLeader) { + if (getCurrentBehavior() instanceof AbstractLeader) { AbstractLeader leader = (AbstractLeader)getCurrentBehavior(); Collection followerIds = leader.getFollowerIds(); List followerInfoList = Lists.newArrayListWithCapacity(followerIds.size()); - for(String id: followerIds) { + for (String id: followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()), @@ -471,24 +471,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // it can happen that the state has not changed but the leader has changed. Optional roleChangeNotifier = getRoleChangeNotifier(); - if(!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) || - oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { - if(roleChangeNotifier.isPresent()) { + if (!Objects.equals(lastLeaderId, currentBehavior.getLeaderId()) + || oldBehaviorState.getLeaderPayloadVersion() != currentBehavior.getLeaderPayloadVersion()) { + if (roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), currentBehavior.getLeaderId(), currentBehavior.getLeaderPayloadVersion()), getSelf()); } onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId()); - if(leadershipTransferInProgress != null) { + if (leadershipTransferInProgress != null) { leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); } serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId()); } - if (roleChangeNotifier.isPresent() && - (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { + if (roleChangeNotifier.isPresent() + && (oldBehavior == null || oldBehavior.state() != currentBehavior.state())) { roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName , currentBehavior.state().name()), getSelf()); } @@ -502,19 +502,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { public long snapshotSequenceNr() { // When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal, // so that we can delete the persistent journal based on the saved sequence-number - // However , when akka replays the journal during recovery, it replays it from the sequence number when the snapshot - // was saved and not the number we saved. - // We would want to override it , by asking akka to use the last-sequence number known to us. + // However , when akka replays the journal during recovery, it replays it from the sequence number when the + // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the + // last-sequence number known to us. return context.getSnapshotManager().getLastSequenceNumber(); } /** * When a derived RaftActor needs to persist something it must call * persistData. - * - * @param clientActor - * @param identifier - * @param data */ protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { @@ -522,14 +518,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry); - } + LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry); final RaftActorContext raftContext = getRaftActorContext(); replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> { - if (!hasFollowers()){ + if (!hasFollowers()) { // Increment the Commit Index and the Last Applied values raftContext.setCommitIndex(replicatedLogEntry1.getIndex()); raftContext.setLastApplied(replicatedLogEntry1.getIndex()); @@ -570,7 +564,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * Derived actors can call the isLeader method to check if the current - * RaftActor is the Leader or not + * RaftActor is the Leader or not. * * @return true it this RaftActor is a Leader false otherwise */ @@ -579,8 +573,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected final boolean isLeaderActive() { - return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader && - !shuttingDown && !isLeadershipTransferInProgress(); + return getRaftState() != RaftState.IsolatedLeader && getRaftState() != RaftState.PreLeader + && !shuttingDown && !isLeadershipTransferInProgress(); } private boolean isLeadershipTransferInProgress() { @@ -594,10 +588,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * * @return A reference to the leader if known, null otherwise */ - protected ActorSelection getLeader(){ + public ActorSelection getLeader() { String leaderAddress = getLeaderAddress(); - if(leaderAddress == null){ + if (leaderAddress == null) { return null; } @@ -605,10 +599,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } /** + * Returns the id of the current leader. * * @return the current leader's id */ - protected final String getLeaderId(){ + protected final String getLeaderId() { return getCurrentBehavior().getLeaderId(); } @@ -617,7 +612,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return getCurrentBehavior().state(); } - protected Long getCurrentTerm(){ + protected Long getCurrentTerm() { return context.getTermInformation().getCurrentTerm(); } @@ -628,10 +623,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected void updateConfigParams(ConfigParams configParams) { // obtain the RaftPolicy for oldConfigParams and the updated one. - String oldRaftPolicy = context.getConfigParams(). - getCustomRaftPolicyImplementationClass(); - String newRaftPolicy = configParams. - getCustomRaftPolicyImplementationClass(); + String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass(); + String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass(); LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(), oldRaftPolicy, newRaftPolicy); @@ -645,7 +638,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { String previousLeaderId = behavior.getLeaderId(); short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion(); - LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), previousLeaderId); + LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(), + previousLeaderId); changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion)); } else { @@ -664,14 +658,14 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected void setPersistence(boolean persistent) { DataPersistenceProvider currentPersistence = persistence(); - if(persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) { + if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) { setPersistence(new PersistentDataProvider(this)); - if(getCurrentBehavior() != null) { + if (getCurrentBehavior() != null) { LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId()); captureSnapshot(); } - } else if(!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { + } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { setPersistence(new NonPersistentDataProvider() { /** * The way snapshotting works is, @@ -685,7 +679,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * */ @Override - public void saveSnapshot(Object o) { + public void saveSnapshot(Object object) { // Make saving Snapshot successful // Committing the snapshot here would end up calling commit in the creating state which would // be a state violation. That's why now we send a message to commit the snapshot. @@ -697,25 +691,24 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * setPeerAddress sets the address of a known peer at a later time. + * *

* This is to account for situations where a we know that a peer * exists but we do not know an address up-front. This may also be used in * situations where a known peer starts off in a different location and we * need to change it's address + * *

* Note that if the peerId does not match the list of peers passed to * this actor during construction an IllegalStateException will be thrown. - * - * @param peerId - * @param peerAddress */ - protected void setPeerAddress(String peerId, String peerAddress){ + protected void setPeerAddress(String peerId, String peerAddress) { context.setPeerAddress(peerId, peerAddress); } /** * The applyState method will be called by the RaftActor when some data - * needs to be applied to the actor's state + * needs to be applied to the actor's state. * * @param clientActor A reference to the client who sent this message. This * is the same reference that was passed to persistData @@ -757,7 +750,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onStateChanged(); /** - * Notifier Actor for this RaftActor to notify when a role change happens + * Notifier Actor for this RaftActor to notify when a role change happens. + * * @return ActorRef - ActorRef of the notifier or Optional.absent if none. */ protected abstract Optional getRoleChangeNotifier(); @@ -768,6 +762,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * work prior to performing the operation. On completion of any work, the run method must be called on the * given Runnable to proceed with the given operation. Important: the run method must be called on * this actor's thread dispatcher as as it modifies internal state. + * *

* The default implementation immediately runs the operation. * @@ -778,11 +773,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } protected void onLeaderChanged(String oldLeader, String newLeader) { + } - }; - - private String getLeaderAddress(){ - if(isLeader()){ + private String getLeaderAddress() { + if (isLeader()) { return getSelf().path().toString(); } String leaderId = getLeaderId(); @@ -790,15 +784,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return null; } String peerAddress = context.getPeerAddress(leaderId); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", - persistenceId(), leaderId, peerAddress); - } + LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress); return peerAddress; } - protected boolean hasFollowers(){ + protected boolean hasFollowers() { return getRaftActorContext().hasFollowers(); } @@ -844,67 +835,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - /** - * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.persisted.DeleteEntries} - * whose type for fromIndex is long instead of int. This class was kept for backwards - * compatibility with Helium. - */ - // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility. - @SuppressWarnings("serial") - @Deprecated - static class DeleteEntries implements Serializable { - private final int fromIndex; - - public DeleteEntries(int fromIndex) { - this.fromIndex = fromIndex; - } - - public int getFromIndex() { - return fromIndex; - } - - private Object readResolve() { - return org.opendaylight.controller.cluster.raft.persisted.DeleteEntries.createMigrated(fromIndex); - } - } - - /** - * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm} - * which has serialVersionUID set. This class was kept for backwards compatibility with Helium. - */ - // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility. - @SuppressWarnings("serial") - @Deprecated - static class UpdateElectionTerm implements Serializable { - private final long currentTerm; - private final String votedFor; - - public UpdateElectionTerm(long currentTerm, String votedFor) { - this.currentTerm = currentTerm; - this.votedFor = votedFor; - } - - public long getCurrentTerm() { - return currentTerm; - } - - public String getVotedFor() { - return votedFor; - } - - private Object readResolve() { - return org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm.createMigrated( - currentTerm, votedFor); - } - } - /** * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors. */ - private static abstract class BehaviorState implements Immutable { + private abstract static class BehaviorState implements Immutable { @Nullable abstract RaftActorBehavior getBehavior(); + @Nullable abstract String getLastValidLeaderId(); + @Nullable abstract String getLastLeaderId(); + @Nullable abstract short getLeaderPayloadVersion(); }