X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=f9099a7b818a7e8b5004157a4b73da0f90e57ac9;hp=46551506e337182df1c0f7a05e2a6fe1e11043ab;hb=1b1360ac337d23b9a586f62616eb278c3065eef0;hpb=fa96da71c5ab10973a9f93c2e8b35494b96fd7ed diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java old mode 100644 new mode 100755 index 46551506e3..f9099a7b81 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; +import akka.actor.Status; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -49,8 +50,11 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; @@ -117,12 +121,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private RaftActorServerConfigurationSupport serverConfigurationSupport; - private RaftActorLeadershipTransferCohort leadershipTransferInProgress; - private boolean shuttingDown; - protected RaftActor(String id, Map peerAddresses, - Optional configParams, short payloadVersion) { + protected RaftActor(final String id, final Map peerAddresses, + final Optional configParams, final short payloadVersion) { persistentProvider = new PersistentDataProvider(this); delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider); @@ -131,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), - delegatingPersistenceProvider, LOG); + delegatingPersistenceProvider, this::handleApplyState, LOG); context.setPayloadVersion(payloadVersion); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); @@ -155,7 +157,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @Override - protected void handleRecover(Object message) { + protected void handleRecover(final Object message) { if (raftRecovery == null) { raftRecovery = newRaftActorRecoverySupport(); } @@ -181,7 +183,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @VisibleForTesting @SuppressWarnings("checkstyle:IllegalCatch") - protected void changeCurrentBehavior(RaftActorBehavior newBehavior) { + protected void changeCurrentBehavior(final RaftActorBehavior newBehavior) { final RaftActorBehavior currentBehavior = getCurrentBehavior(); if (currentBehavior != null) { try { @@ -223,29 +225,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (snapshotSupport.handleSnapshotMessage(message, getSender())) { return; } - if (message instanceof ApplyState) { ApplyState applyState = (ApplyState) message; - long startTime = System.nanoTime(); - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Applying state for log index {} data {}", - persistenceId(), applyState.getReplicatedLogEntry().getIndex(), - applyState.getReplicatedLogEntry().getData()); - } - - if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) { - applyState(applyState.getClientActor(), applyState.getIdentifier(), - applyState.getReplicatedLogEntry().getData()); - } - - long elapsedTime = System.nanoTime() - startTime; - if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { - LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", - TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); - } - if (!hasFollowers()) { // for single node, the capture should happen after the apply state // as we delete messages from the persistent journal which have made it to the snapshot @@ -256,9 +238,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getSnapshotManager().trimLog(context.getLastApplied()); } - // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState. possiblyHandleBehaviorMessage(message); - } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); @@ -277,18 +257,59 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SwitchBehavior) { switchBehavior((SwitchBehavior) message); } else if (message instanceof LeaderTransitioning) { - onLeaderTransitioning(); + onLeaderTransitioning((LeaderTransitioning)message); } else if (message instanceof Shutdown) { onShutDown(); } else if (message instanceof Runnable) { ((Runnable)message).run(); } else if (message instanceof NoopPayload) { - persistData(null, null, (NoopPayload)message); + persistData(null, null, (NoopPayload) message, false); + } else if (message instanceof RequestLeadership) { + onRequestLeadership((RequestLeadership) message); } else if (!possiblyHandleBehaviorMessage(message)) { handleNonRaftCommand(message); } } + private void onRequestLeadership(final RequestLeadership message) { + LOG.debug("{}: onRequestLeadership {}", persistenceId(), message); + if (!isLeader()) { + // non-leader cannot satisfy leadership request + LOG.warn("{}: onRequestLeadership {} was sent to non-leader." + + " Current behavior: {}. Sending failure response", + persistenceId(), getCurrentBehavior().state()); + message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to " + + message.getRequestedFollowerId() + + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf()); + return; + } + + final String requestedFollowerId = message.getRequestedFollowerId(); + final ActorRef replyTo = message.getReplyTo(); + initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { + @Override + public void onSuccess(final ActorRef raftActorRef) { + // sanity check + if (!requestedFollowerId.equals(getLeaderId())) { + onFailure(raftActorRef); + } + + LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId); + replyTo.tell(new Status.Success(null), getSelf()); + } + + @Override + public void onFailure(final ActorRef raftActorRef) { + LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId); + replyTo.tell(new Status.Failure( + new LeadershipTransferFailedException( + "Failed to transfer leadership to " + requestedFollowerId + + ". Follower is not ready to become leader")), + getSelf()); + } + }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); + } + private boolean possiblyHandleBehaviorMessage(final Object message) { final RaftActorBehavior currentBehavior = getCurrentBehavior(); final BehaviorState state = behaviorStateTracker.capture(currentBehavior); @@ -305,25 +326,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return false; } - private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { + private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete, + @Nullable final String followerId, final long newLeaderTimeoutInMillis) { LOG.debug("{}: Initiating leader transfer", persistenceId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress == null) { - leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this); + leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId); + leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + public void onSuccess(final ActorRef raftActorRef) { + context.setRaftActorLeadershipTransferCohort(null); } @Override - public void onFailure(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + public void onFailure(final ActorRef raftActorRef) { + context.setRaftActorLeadershipTransferCohort(null); } }); leadershipTransferInProgress.addOnComplete(onComplete); + + context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress); leadershipTransferInProgress.init(); + } else { LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId()); leadershipTransferInProgress.addOnComplete(onComplete); @@ -340,26 +367,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { shuttingDown = true; final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if (currentBehavior.state() != RaftState.Leader) { - // For non-leaders shutdown is a no-op - self().tell(PoisonPill.getInstance(), self()); - return; + switch (currentBehavior.state()) { + case Leader: + case PreLeader: + // Fall-through to more work + break; + default: + // For non-leaders shutdown is a no-op + self().tell(PoisonPill.getInstance(), self()); + return; } if (context.hasFollowers()) { initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef) { + public void onSuccess(final ActorRef raftActorRef) { LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } @Override - public void onFailure(ActorRef raftActorRef) { + public void onFailure(final ActorRef raftActorRef) { LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } - }); + }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS)); } else { pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) { @Override @@ -375,22 +407,23 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void onLeaderTransitioning() { - LOG.debug("{}: onLeaderTransitioning", persistenceId()); + private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) { + LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning); Optional roleChangeNotifier = getRoleChangeNotifier(); - if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { + if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent() + && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } } - private void switchBehavior(SwitchBehavior message) { + private void switchBehavior(final SwitchBehavior message) { if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { RaftState newState = message.getNewState(); - if ( newState == RaftState.Leader || newState == RaftState.Follower) { + if (newState == RaftState.Leader || newState == RaftState.Follower) { + getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()), AbstractRaftActorBehavior.createBehavior(context, message.getNewState())); - getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); } else { LOG.warn("Switching to behavior : {} - not supported", newState); } @@ -418,7 +451,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.Builder builder = OnDemandRaftState.builder() + OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -451,7 +484,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { for (String id: followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), - info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()), + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS( + TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())), context.getPeerInfo(info.getId()).isVoting())); } @@ -462,7 +496,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } - private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) { + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandRaftState.builder(); + } + + private void handleBehaviorChange(final BehaviorState oldBehaviorState, final RaftActorBehavior currentBehavior) { RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); if (oldBehavior != currentBehavior) { @@ -484,6 +522,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = + context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress != null) { leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); } @@ -498,7 +538,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { + private void handleApplyState(final ApplyState applyState) { + long startTime = System.nanoTime(); + + Payload payload = applyState.getReplicatedLogEntry().getData(); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Applying state for log index {} data {}", + persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload); + } + + if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) { + applyState(applyState.getClientActor(), applyState.getIdentifier(), payload); + } + + long elapsedTime = System.nanoTime() - startTime; + if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { + LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", + TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); + } + + // Send the ApplyState message back to self to handle further processing asynchronously. + self().tell(applyState, self()); + } + + protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, + final short leaderPayloadVersion) { return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } @@ -513,12 +577,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } /** - * When a derived RaftActor needs to persist something it must call - * persistData. + * Persists the given Payload in the journal and replicates to any followers. After successful completion, + * {@link #applyState(ActorRef, Identifier, Object)} is notified. + * + * @param clientActor optional ActorRef that is provided via the applyState callback + * @param identifier the payload identifier + * @param data the payload data to persist + * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with + * subsequent payloads for efficiency. Otherwise the payload is immediately replicated. */ - protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { - - ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( + protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data, + final boolean batchHint) { + ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry( context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); replicatedLogEntry.setPersistencePending(true); @@ -537,7 +607,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { raftContext.setLastApplied(persistedLogEntry.getIndex()); // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self()); + handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry)); // Send a ApplyJournalEntries message so that we write the fact that we applied // the state to durable storage @@ -555,7 +625,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (wasAppended && hasFollowers()) { // Send log entry for replication. - getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry)); + getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry, + !batchHint)); } } @@ -568,7 +639,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @VisibleForTesting - void setCurrentBehavior(RaftActorBehavior behavior) { + void setCurrentBehavior(final RaftActorBehavior behavior) { context.setCurrentBehavior(behavior); } @@ -591,7 +662,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { && !shuttingDown && !isLeadershipTransferInProgress(); } - private boolean isLeadershipTransferInProgress() { + protected boolean isLeadershipTransferInProgress() { + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring(); } @@ -634,7 +706,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return context; } - protected void updateConfigParams(ConfigParams configParams) { + protected void updateConfigParams(final ConfigParams configParams) { // obtain the RaftPolicy for oldConfigParams and the updated one. String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass(); @@ -666,11 +738,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return delegatingPersistenceProvider.getDelegate(); } - public void setPersistence(DataPersistenceProvider provider) { + public void setPersistence(final DataPersistenceProvider provider) { delegatingPersistenceProvider.setDelegate(provider); } - protected void setPersistence(boolean persistent) { + protected void setPersistence(final boolean persistent) { DataPersistenceProvider currentPersistence = persistence(); if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) { setPersistence(new PersistentDataProvider(this)); @@ -680,8 +752,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { captureSnapshot(); } } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { - setPersistence(new NonPersistentDataProvider() { - /** + setPersistence(new NonPersistentDataProvider(this) { + /* * The way snapshotting works is, *
    *
  1. RaftActor calls createSnapshot on the Shard @@ -693,7 +765,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { *
*/ @Override - public void saveSnapshot(Object object) { + public void saveSnapshot(final Object object) { // Make saving Snapshot successful // Committing the snapshot here would end up calling commit in the creating state which would // be a state violation. That's why now we send a message to commit the snapshot. @@ -716,7 +788,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * Note that if the peerId does not match the list of peers passed to * this actor during construction an IllegalStateException will be thrown. */ - protected void setPeerAddress(String peerId, String peerAddress) { + protected void setPeerAddress(final String peerId, final String peerAddress) { context.setPeerAddress(peerId, peerAddress); } @@ -751,7 +823,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onRecoveryComplete(); /** - * Returns the RaftActorSnapshotCohort to participate in persistence recovery. + * Returns the RaftActorSnapshotCohort to participate in snapshot captures. */ @Nonnull protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort(); @@ -770,6 +842,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ protected abstract Optional getRoleChangeNotifier(); + /** + * This method is called on the leader when a voting change operation completes. + */ + protected void onVotingStateChangeComplete() { + } + /** * This method is called prior to operations such as leadership transfer and actor shutdown when the leader * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current @@ -782,11 +860,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * * @param operation the operation to run */ - protected void pauseLeader(Runnable operation) { + protected void pauseLeader(final Runnable operation) { operation.run(); } - protected void onLeaderChanged(String oldLeader, String newLeader) { + /** + * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader + * should resume normal operations. + * + *

+ * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked. + */ + protected void unpauseLeader() { + + } + + protected void onLeaderChanged(final String oldLeader, final String newLeader) { } private String getLeaderAddress() { @@ -827,13 +916,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (isLeader()) { initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef) { + public void onSuccess(final ActorRef raftActorRef) { LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId()); ensureFollowerState(); } @Override - public void onFailure(ActorRef raftActorRef) { + public void onFailure(final ActorRef raftActorRef) { LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId()); ensureFollowerState(); } @@ -845,7 +934,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { initializeBehavior(); } } - }); + }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); } } @@ -950,5 +1039,4 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior); } } - }