X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=1c057d755369d50a2147cf7c3e2922783ae56fc8;hb=aafb8cb044e992dd784d1f4f66508599cc4cd588;hp=0b3fca090cfbfeed1ca89282a406587648848730;hpb=214ba02ca4400d88e494fa27a44c30531f68968e;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 0b3fca090c..1c057d7553 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -11,19 +11,20 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.japi.Procedure; -import akka.persistence.SnapshotSelectionCriteria; +import akka.actor.PoisonPill; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.collect.Lists; -import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; @@ -32,11 +33,14 @@ import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; -import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -44,16 +48,22 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.Immutable; /** * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper * * In Search of an Understandable Consensus Algorithm - *
+ * + ** RaftActor has 3 states and each state has a certain behavior associated * with it. A Raft actor can behave as, *
* A RaftActor MUST be a Leader in order to accept requests from clients to * change the state of it's encapsulated state machine. Once a RaftActor becomes * a Leader it is also responsible for ensuring that all followers ultimately * have the same log and therefore the same state machine as itself. - *
- * + * + ** The current behavior of a RaftActor determines how election for leadership * is initiated and how peer RaftActors react to request for votes. - *
- * + * + ** Each RaftActor also needs to know the current election term. It uses this * information for a couple of things. One is to simply figure out who it * voted for in the last election. Another is to figure out if the message * it received to update it's state is stale. - *
- * + * + ** The RaftActor uses akka-persistence to store it's replicated log. * Furthermore through it's behaviors a Raft Actor determines - *
** This is to account for situations where a we know that a peer * exists but we do not know an address up-front. This may also be used in * situations where a known peer starts off in a different location and we * need to change it's address + * *
* Note that if the peerId does not match the list of peers passed to
* this actor during construction an IllegalStateException will be thrown.
- *
- * @param peerId
- * @param peerAddress
*/
- protected void setPeerAddress(String peerId, String peerAddress){
+ protected void setPeerAddress(String peerId, String peerAddress) {
context.setPeerAddress(peerId, peerAddress);
}
/**
* The applyState method will be called by the RaftActor when some data
- * needs to be applied to the actor's state
+ * needs to be applied to the actor's state.
*
* @param clientActor A reference to the client who sent this message. This
* is the same reference that was passed to persistData
@@ -529,8 +752,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
* @param data A piece of data that was persisted by the persistData call.
* This should NEVER be null.
*/
- protected abstract void applyState(ActorRef clientActor, String identifier,
- Object data);
+ protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data);
/**
* Returns the RaftActorRecoveryCohort to participate in persistence recovery.
@@ -544,7 +766,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
protected abstract void onRecoveryComplete();
/**
- * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
+ * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
*/
@Nonnull
protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
@@ -557,94 +779,191 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
protected abstract void onStateChanged();
/**
- * Notifier Actor for this RaftActor to notify when a role change happens
+ * Notifier Actor for this RaftActor to notify when a role change happens.
+ *
* @return ActorRef - ActorRef of the notifier or Optional.absent if none.
*/
protected abstract Optional
+ * The default implementation immediately runs the operation.
+ *
+ * @param operation the operation to run
+ */
+ protected void pauseLeader(Runnable operation) {
+ operation.run();
+ }
- private String getLeaderAddress(){
- if(isLeader()){
+ protected void onLeaderChanged(String oldLeader, String newLeader) {
+ }
+
+ private String getLeaderAddress() {
+ if (isLeader()) {
return getSelf().path().toString();
}
- String leaderId = currentBehavior.getLeaderId();
+ String leaderId = getLeaderId();
if (leaderId == null) {
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
- persistenceId(), leaderId, peerAddress);
- }
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
return peerAddress;
}
- protected boolean hasFollowers(){
+ protected boolean hasFollowers() {
return getRaftActorContext().hasFollowers();
}
+ private void captureSnapshot() {
+ SnapshotManager snapshotManager = context.getSnapshotManager();
+
+ if (!snapshotManager.isCapturing()) {
+ final long idx = getCurrentBehavior().getReplicatedToAllIndex();
+ LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
+ replicatedLog().last(), idx);
+
+ snapshotManager.capture(replicatedLog().last(), idx);
+ }
+ }
+
/**
- * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries}
- * whose type for fromIndex is long instead of int. This class was kept for backwards
- * compatibility with Helium.
+ * Switch this member to non-voting status. This is a no-op for all behaviors except when we are the leader,
+ * in which case we need to step down.
*/
- // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
- @SuppressWarnings("serial")
- @Deprecated
- static class DeleteEntries implements Serializable {
- private final int fromIndex;
+ void becomeNonVoting() {
+ if (isLeader()) {
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(ActorRef raftActorRef) {
+ LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
+ ensureFollowerState();
+ }
- public DeleteEntries(int fromIndex) {
- this.fromIndex = fromIndex;
- }
+ @Override
+ public void onFailure(ActorRef raftActorRef) {
+ LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
+ ensureFollowerState();
+ }
- public int getFromIndex() {
- return fromIndex;
+ private void ensureFollowerState() {
+ // Whether or not leadership transfer succeeded, we have to step down as leader and
+ // switch to Follower so ensure that.
+ if (getRaftState() != RaftState.Follower) {
+ initializeBehavior();
+ }
+ }
+ });
}
}
/**
- * @deprecated Deprecated in favor of non-inner class {@link org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm}
- * which has serialVersionUID set. This class was kept for backwards compatibility with Helium.
+ * A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
*/
- // Suppressing this warning as we can't set serialVersionUID to maintain backwards compatibility.
- @SuppressWarnings("serial")
- @Deprecated
- static class UpdateElectionTerm implements Serializable {
- private final long currentTerm;
- private final String votedFor;
+ private abstract static class BehaviorState implements Immutable {
+ @Nullable abstract RaftActorBehavior getBehavior();
- public UpdateElectionTerm(long currentTerm, String votedFor) {
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
- }
+ @Nullable abstract String getLastValidLeaderId();
- public long getCurrentTerm() {
- return currentTerm;
- }
+ @Nullable abstract String getLastLeaderId();
- public String getVotedFor() {
- return votedFor;
- }
+ @Nullable abstract short getLeaderPayloadVersion();
}
- private static class BehaviorStateHolder {
- private RaftActorBehavior behavior;
- private String leaderId;
-
- void init(RaftActorBehavior behavior) {
- this.behavior = behavior;
- this.leaderId = behavior != null ? behavior.getLeaderId() : null;
+ /**
+ * A {@link BehaviorState} corresponding to non-null {@link RaftActorBehavior} state.
+ */
+ private static final class SimpleBehaviorState extends BehaviorState {
+ private final RaftActorBehavior behavior;
+ private final String lastValidLeaderId;
+ private final String lastLeaderId;
+ private final short leaderPayloadVersion;
+
+ SimpleBehaviorState(final String lastValidLeaderId, final String lastLeaderId,
+ final RaftActorBehavior behavior) {
+ this.lastValidLeaderId = lastValidLeaderId;
+ this.lastLeaderId = lastLeaderId;
+ this.behavior = Preconditions.checkNotNull(behavior);
+ this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
}
+ @Override
RaftActorBehavior getBehavior() {
return behavior;
}
- String getLeaderId() {
- return leaderId;
+ @Override
+ String getLastValidLeaderId() {
+ return lastValidLeaderId;
+ }
+
+ @Override
+ short getLeaderPayloadVersion() {
+ return leaderPayloadVersion;
+ }
+
+ @Override
+ String getLastLeaderId() {
+ return lastLeaderId;
+ }
+ }
+
+ /**
+ * Class tracking behavior-related information, which we need to keep around and pass across behavior switches.
+ * An instance is created for each RaftActor. It has two functions:
+ * - it keeps track of the last leader ID we have encountered since we have been created
+ * - it creates state capture needed to transition from one behavior to the next
+ */
+ private static final class BehaviorStateTracker {
+ /**
+ * A {@link BehaviorState} corresponding to null {@link RaftActorBehavior} state. Since null behavior is only
+ * allowed before we receive the first message, we know the leader ID to be null.
+ */
+ private static final BehaviorState NULL_BEHAVIOR_STATE = new BehaviorState() {
+ @Override
+ RaftActorBehavior getBehavior() {
+ return null;
+ }
+
+ @Override
+ String getLastValidLeaderId() {
+ return null;
+ }
+
+ @Override
+ short getLeaderPayloadVersion() {
+ return -1;
+ }
+
+ @Override
+ String getLastLeaderId() {
+ return null;
+ }
+ };
+
+ private String lastValidLeaderId;
+ private String lastLeaderId;
+
+ BehaviorState capture(final RaftActorBehavior behavior) {
+ if (behavior == null) {
+ Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
+ return NULL_BEHAVIOR_STATE;
+ }
+
+ lastLeaderId = behavior.getLeaderId();
+ if (lastLeaderId != null) {
+ lastValidLeaderId = lastLeaderId;
+ }
+
+ return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
}
}
+
}