X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=bc4c77af566c44a799a9c4ce8f0489d5912d3c37;hp=70a0b86952a3ea22e04921860ae7a2bd5ef97dc1;hb=d6d516aa953924121c3cf2a5bf9fd992b9c2b326;hpb=0c05dff15e4f36c5ecbd26e82309de21f67c8cd5 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 70a0b86952..bc4c77af56 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -6,32 +6,36 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; import akka.actor.Status; +import akka.persistence.JournalProtocol; +import akka.persistence.SnapshotProtocol; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import com.google.common.collect.Lists; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.commons.lang3.time.DurationFormatUtils; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -46,7 +50,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; @@ -121,21 +124,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private RaftActorServerConfigurationSupport serverConfigurationSupport; - private RaftActorLeadershipTransferCohort leadershipTransferInProgress; - private boolean shuttingDown; - protected RaftActor(String id, Map peerAddresses, - Optional configParams, short payloadVersion) { + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") + protected RaftActor(final String id, final Map peerAddresses, + final Optional configParams, final short payloadVersion) { persistentProvider = new PersistentDataProvider(this); delegatingPersistenceProvider = new RaftActorDelegatingPersistentDataProvider(null, persistentProvider); - context = new RaftActorContextImpl(this.getSelf(), - this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), - -1, -1, peerAddresses, + context = new RaftActorContextImpl(getSelf(), getContext(), id, + new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), - delegatingPersistenceProvider, this::handleApplyState, LOG); + delegatingPersistenceProvider, this::handleApplyState, LOG, this::executeInSelf); context.setPayloadVersion(payloadVersion); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); @@ -153,13 +154,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @Override - public void postStop() { + public void postStop() throws Exception { context.close(); super.postStop(); } @Override - protected void handleRecover(Object message) { + protected void handleRecover(final Object message) { if (raftRecovery == null) { raftRecovery = newRaftActorRecoverySupport(); } @@ -185,7 +186,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @VisibleForTesting @SuppressWarnings("checkstyle:IllegalCatch") - protected void changeCurrentBehavior(RaftActorBehavior newBehavior) { + protected void changeCurrentBehavior(final RaftActorBehavior newBehavior) { final RaftActorBehavior currentBehavior = getCurrentBehavior(); if (currentBehavior != null) { try { @@ -215,7 +216,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * Handles a message. * * @deprecated This method is not final for testing purposes. DO NOT OVERRIDE IT, override - * {@link #handleNonRaftCommand(Object)} instead. + * {@link #handleNonRaftCommand(Object)} instead. */ @Deprecated @Override @@ -246,12 +247,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); persistence().persistAsync(applyEntries, NoopProcedure.instance()); - } else if (message instanceof FindLeader) { - getSender().tell( - new FindLeaderReply(getLeaderAddress()), - getSelf() - ); + getSender().tell(new FindLeaderReply(getLeaderAddress()), getSelf()); } else if (message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); } else if (message instanceof InitiateCaptureSnapshot) { @@ -269,7 +266,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof RequestLeadership) { onRequestLeadership((RequestLeadership) message); } else if (!possiblyHandleBehaviorMessage(message)) { - handleNonRaftCommand(message); + if (message instanceof JournalProtocol.Response + && delegatingPersistenceProvider.handleJournalResponse((JournalProtocol.Response) message)) { + LOG.debug("{}: handled a journal response", persistenceId()); + } else if (message instanceof SnapshotProtocol.Response + && delegatingPersistenceProvider.handleSnapshotResponse((SnapshotProtocol.Response) message)) { + LOG.debug("{}: handled a snapshot response", persistenceId()); + } else { + handleNonRaftCommand(message); + } } } @@ -279,7 +284,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // non-leader cannot satisfy leadership request LOG.warn("{}: onRequestLeadership {} was sent to non-leader." + " Current behavior: {}. Sending failure response", - persistenceId(), getCurrentBehavior().state()); + persistenceId(), message, getCurrentBehavior().state()); message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to " + message.getRequestedFollowerId() + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf()); @@ -309,7 +314,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { + ". Follower is not ready to become leader")), getSelf()); } - }, message.getRequestedFollowerId()); + }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); } private boolean possiblyHandleBehaviorMessage(final Object message) { @@ -328,30 +333,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return false; } - private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) { - initiateLeadershipTransfer(onComplete, null); - } - private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete, - final String followerId) { + final @Nullable String followerId, final long newLeaderTimeoutInMillis) { LOG.debug("{}: Initiating leader transfer", persistenceId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress == null) { leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId); + leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis); leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + public void onSuccess(final ActorRef raftActorRef) { + context.setRaftActorLeadershipTransferCohort(null); } @Override - public void onFailure(ActorRef raftActorRef) { - leadershipTransferInProgress = null; + public void onFailure(final ActorRef raftActorRef) { + context.setRaftActorLeadershipTransferCohort(null); } }); leadershipTransferInProgress.addOnComplete(onComplete); + + context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress); leadershipTransferInProgress.init(); + } else { LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId()); leadershipTransferInProgress.addOnComplete(onComplete); @@ -368,26 +374,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { shuttingDown = true; final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - if (currentBehavior.state() != RaftState.Leader) { - // For non-leaders shutdown is a no-op - self().tell(PoisonPill.getInstance(), self()); - return; + switch (currentBehavior.state()) { + case Leader: + case PreLeader: + // Fall-through to more work + break; + default: + // For non-leaders shutdown is a no-op + self().tell(PoisonPill.getInstance(), self()); + return; } if (context.hasFollowers()) { initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef) { + public void onSuccess(final ActorRef raftActorRef) { LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } @Override - public void onFailure(ActorRef raftActorRef) { + public void onFailure(final ActorRef raftActorRef) { LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId()); raftActorRef.tell(PoisonPill.getInstance(), raftActorRef); } - }); + }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS)); } else { pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) { @Override @@ -413,13 +424,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void switchBehavior(SwitchBehavior message) { + private void switchBehavior(final SwitchBehavior message) { if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { RaftState newState = message.getNewState(); if (newState == RaftState.Leader || newState == RaftState.Follower) { + getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()), AbstractRaftActorBehavior.createBehavior(context, message.getNewState())); - getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); } else { LOG.warn("Switching to behavior : {} - not supported", newState); } @@ -447,7 +458,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() + OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -476,11 +487,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (getCurrentBehavior() instanceof AbstractLeader) { AbstractLeader leader = (AbstractLeader)getCurrentBehavior(); Collection followerIds = leader.getFollowerIds(); - List followerInfoList = Lists.newArrayListWithCapacity(followerIds.size()); + List followerInfoList = new ArrayList<>(followerIds.size()); for (String id: followerIds) { final FollowerLogInformation info = leader.getFollower(id); followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), - info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()), + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS( + TimeUnit.NANOSECONDS.toMillis(info.nanosSinceLastActivity())), context.getPeerInfo(info.getId()).isVoting())); } @@ -491,11 +503,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } - protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { return OnDemandRaftState.builder(); } - private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) { + private void handleBehaviorChange(final BehaviorState oldBehaviorState, final RaftActorBehavior currentBehavior) { RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); if (oldBehavior != currentBehavior) { @@ -517,6 +529,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId()); + RaftActorLeadershipTransferCohort leadershipTransferInProgress = + context.getRaftActorLeadershipTransferCohort(); if (leadershipTransferInProgress != null) { leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId()); } @@ -531,7 +545,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void handleApplyState(ApplyState applyState) { + private void handleApplyState(final ApplyState applyState) { long startTime = System.nanoTime(); Payload payload = applyState.getReplicatedLogEntry().getData(); @@ -554,7 +568,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { self().tell(applyState, self()); } - protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { + protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, + final short leaderPayloadVersion) { return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } @@ -631,7 +646,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } @VisibleForTesting - void setCurrentBehavior(RaftActorBehavior behavior) { + void setCurrentBehavior(final RaftActorBehavior behavior) { context.setCurrentBehavior(behavior); } @@ -654,7 +669,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { && !shuttingDown && !isLeadershipTransferInProgress(); } - private boolean isLeadershipTransferInProgress() { + protected boolean isLeadershipTransferInProgress() { + RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort(); return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring(); } @@ -697,7 +713,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return context; } - protected void updateConfigParams(ConfigParams configParams) { + protected void updateConfigParams(final ConfigParams configParams) { // obtain the RaftPolicy for oldConfigParams and the updated one. String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass(); @@ -729,11 +745,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return delegatingPersistenceProvider.getDelegate(); } - public void setPersistence(DataPersistenceProvider provider) { + public void setPersistence(final DataPersistenceProvider provider) { delegatingPersistenceProvider.setDelegate(provider); } - protected void setPersistence(boolean persistent) { + protected void setPersistence(final boolean persistent) { DataPersistenceProvider currentPersistence = persistence(); if (persistent && (currentPersistence == null || !currentPersistence.isRecoveryApplicable())) { setPersistence(new PersistentDataProvider(this)); @@ -743,8 +759,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { captureSnapshot(); } } else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) { - setPersistence(new NonPersistentDataProvider() { - /** + setPersistence(new NonPersistentDataProvider(this) { + /* * The way snapshotting works is, *
    *
  1. RaftActor calls createSnapshot on the Shard @@ -756,7 +772,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { *
*/ @Override - public void saveSnapshot(Object object) { + public void saveSnapshot(final Object object) { // Make saving Snapshot successful // Committing the snapshot here would end up calling commit in the creating state which would // be a state violation. That's why now we send a message to commit the snapshot. @@ -779,7 +795,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * Note that if the peerId does not match the list of peers passed to * this actor during construction an IllegalStateException will be thrown. */ - protected void setPeerAddress(String peerId, String peerAddress) { + protected void setPeerAddress(final String peerId, final String peerAddress) { context.setPeerAddress(peerId, peerAddress); } @@ -805,8 +821,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * Returns the RaftActorRecoveryCohort to participate in persistence recovery. */ - @Nonnull - protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort(); + protected abstract @NonNull RaftActorRecoveryCohort getRaftActorRecoveryCohort(); /** * This method is called when recovery is complete. @@ -816,8 +831,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { /** * Returns the RaftActorSnapshotCohort to participate in snapshot captures. */ - @Nonnull - protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort(); + protected abstract @NonNull RaftActorSnapshotCohort getRaftActorSnapshotCohort(); /** * This method will be called by the RaftActor when the state of the @@ -833,6 +847,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ protected abstract Optional getRoleChangeNotifier(); + /** + * This method is called on the leader when a voting change operation completes. + */ + protected void onVotingStateChangeComplete() { + } + /** * This method is called prior to operations such as leadership transfer and actor shutdown when the leader * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current @@ -845,11 +865,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * * @param operation the operation to run */ - protected void pauseLeader(Runnable operation) { + protected void pauseLeader(final Runnable operation) { operation.run(); } - protected void onLeaderChanged(String oldLeader, String newLeader) { + /** + * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader + * should resume normal operations. + * + *

+ * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked. + */ + protected void unpauseLeader() { + + } + + protected void onLeaderChanged(final String oldLeader, final String newLeader) { } private String getLeaderAddress() { @@ -878,7 +909,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}", replicatedLog().last(), idx); - snapshotManager.capture(replicatedLog().last(), idx); + snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx); } } @@ -890,13 +921,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (isLeader()) { initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() { @Override - public void onSuccess(ActorRef raftActorRef) { + public void onSuccess(final ActorRef raftActorRef) { LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId()); ensureFollowerState(); } @Override - public void onFailure(ActorRef raftActorRef) { + public void onFailure(final ActorRef raftActorRef) { LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId()); ensureFollowerState(); } @@ -908,7 +939,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { initializeBehavior(); } } - }); + }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT); } } @@ -922,7 +953,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Nullable abstract String getLastLeaderId(); - @Nullable abstract short getLeaderPayloadVersion(); + abstract short getLeaderPayloadVersion(); } /** @@ -938,8 +969,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { final RaftActorBehavior behavior) { this.lastValidLeaderId = lastValidLeaderId; this.lastLeaderId = lastLeaderId; - this.behavior = Preconditions.checkNotNull(behavior); - this.leaderPayloadVersion = behavior.getLeaderPayloadVersion(); + this.behavior = requireNonNull(behavior); + leaderPayloadVersion = behavior.getLeaderPayloadVersion(); } @Override @@ -1001,7 +1032,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { BehaviorState capture(final RaftActorBehavior behavior) { if (behavior == null) { - Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader"); + verify(lastValidLeaderId == null, "Null behavior with non-null last leader"); return NULL_BEHAVIOR_STATE; } @@ -1013,5 +1044,4 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior); } } - }