X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=6fd0693db22c77ae75ec68f7110ff0d427d6fcba;hb=73ab61a037dd2489600acbc1eaf6f9ee549c204a;hp=57eb2647c9ebbc3ab345b93b16bcbfde44a65b1c;hpb=b2cb02f62ab7c7599e8d94fe92d1ce63e17d599b;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 57eb2647c9..6fd0693db2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -6,34 +6,38 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.PoisonPill; +import akka.actor.Status; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import com.google.common.collect.Lists; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.commons.lang3.time.DurationFormatUtils; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.mgmt.api.FollowerInfo; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; @@ -44,24 +48,25 @@ import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper * * In Search of an Understandable Consensus Algorithm - *
+ * + ** RaftActor has 3 states and each state has a certain behavior associated * with it. A Raft actor can behave as, *
* A RaftActor MUST be a Leader in order to accept requests from clients to * change the state of it's encapsulated state machine. Once a RaftActor becomes * a Leader it is also responsible for ensuring that all followers ultimately * have the same log and therefore the same state machine as itself. - *
- * + * + ** The current behavior of a RaftActor determines how election for leadership * is initiated and how peer RaftActors react to request for votes. - *
- * + * + ** Each RaftActor also needs to know the current election term. It uses this * information for a couple of things. One is to simply figure out who it * voted for in the last election. Another is to figure out if the message * it received to update it's state is stale. - *
- * + * + ** The RaftActor uses akka-persistence to store it's replicated log. * Furthermore through it's behaviors a Raft Actor determines - *
** This is to account for situations where a we know that a peer * exists but we do not know an address up-front. This may also be used in * situations where a known peer starts off in a different location and we * need to change it's address + * *
* Note that if the peerId does not match the list of peers passed to
* this actor during construction an IllegalStateException will be thrown.
- *
- * @param peerId
- * @param peerAddress
*/
- protected void setPeerAddress(String peerId, String peerAddress){
+ protected void setPeerAddress(final String peerId, final String peerAddress) {
context.setPeerAddress(peerId, peerAddress);
}
/**
* The applyState method will be called by the RaftActor when some data
- * needs to be applied to the actor's state
+ * needs to be applied to the actor's state.
*
* @param clientActor A reference to the client who sent this message. This
* is the same reference that was passed to persistData
@@ -741,8 +816,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
/**
* Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
- @Nonnull
- protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
+ protected abstract @NonNull RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
* This method is called when recovery is complete.
@@ -750,10 +824,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
protected abstract void onRecoveryComplete();
/**
- * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
+ * Returns the RaftActorSnapshotCohort to participate in snapshot captures.
*/
- @Nonnull
- protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
+ protected abstract @NonNull RaftActorSnapshotCohort getRaftActorSnapshotCohort();
/**
* This method will be called by the RaftActor when the state of the
@@ -763,32 +836,50 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
protected abstract void onStateChanged();
/**
- * Notifier Actor for this RaftActor to notify when a role change happens
+ * Notifier Actor for this RaftActor to notify when a role change happens.
+ *
* @return ActorRef - ActorRef of the notifier or Optional.absent if none.
*/
protected abstract Optional
* The default implementation immediately runs the operation.
*
* @param operation the operation to run
*/
- protected void pauseLeader(Runnable operation) {
+ protected void pauseLeader(final Runnable operation) {
operation.run();
}
- protected void onLeaderChanged(String oldLeader, String newLeader) {
+ /**
+ * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader
+ * should resume normal operations.
+ *
+ *
+ * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
+ */
+ protected void unpauseLeader() {
}
- private String getLeaderAddress(){
- if(isLeader()){
+ protected void onLeaderChanged(final String oldLeader, final String newLeader) {
+ }
+
+ private String getLeaderAddress() {
+ if (isLeader()) {
return getSelf().path().toString();
}
String leaderId = getLeaderId();
@@ -796,15 +887,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
- persistenceId(), leaderId, peerAddress);
- }
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
return peerAddress;
}
- protected boolean hasFollowers(){
+ protected boolean hasFollowers() {
return getRaftActorContext().hasFollowers();
}
@@ -816,7 +904,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
replicatedLog().last(), idx);
- snapshotManager.capture(replicatedLog().last(), idx);
+ snapshotManager.captureWithForcedTrim(replicatedLog().last(), idx);
}
}
@@ -828,13 +916,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
if (isLeader()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef) {
+ public void onSuccess(final ActorRef raftActorRef) {
LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
ensureFollowerState();
}
@Override
- public void onFailure(ActorRef raftActorRef) {
+ public void onFailure(final ActorRef raftActorRef) {
LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
ensureFollowerState();
}
@@ -846,18 +934,21 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
initializeBehavior();
}
}
- });
+ }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
}
/**
* A point-in-time capture of {@link RaftActorBehavior} state critical for transitioning between behaviors.
*/
- private static abstract class BehaviorState implements Immutable {
+ private abstract static class BehaviorState implements Immutable {
@Nullable abstract RaftActorBehavior getBehavior();
+
@Nullable abstract String getLastValidLeaderId();
+
@Nullable abstract String getLastLeaderId();
- @Nullable abstract short getLeaderPayloadVersion();
+
+ abstract short getLeaderPayloadVersion();
}
/**
@@ -873,8 +964,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
final RaftActorBehavior behavior) {
this.lastValidLeaderId = lastValidLeaderId;
this.lastLeaderId = lastLeaderId;
- this.behavior = Preconditions.checkNotNull(behavior);
- this.leaderPayloadVersion = behavior.getLeaderPayloadVersion();
+ this.behavior = requireNonNull(behavior);
+ leaderPayloadVersion = behavior.getLeaderPayloadVersion();
}
@Override
@@ -936,7 +1027,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
BehaviorState capture(final RaftActorBehavior behavior) {
if (behavior == null) {
- Verify.verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
+ verify(lastValidLeaderId == null, "Null behavior with non-null last leader");
return NULL_BEHAVIOR_STATE;
}
@@ -948,5 +1039,4 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
}
}
-
}