X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorLeadershipTransferCohort.java;h=c3d5af55cd401d2e66507d61983446a5e7e1b58b;hb=d0f46920468c8e4b67c68bd9058572b2d10d75f1;hp=623fa4902ca6fc45ce5d86630ef620afde3b9c10;hpb=4e000b89c3b5ac555cb1e2c39e999a8633b48a96;p=controller.git
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
index 623fa4902c..c3d5af55cd 100644
--- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
+++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java
@@ -11,11 +11,12 @@ import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -26,6 +27,7 @@ import scala.concurrent.duration.FiniteDuration;
/**
* A raft actor support class that participates in leadership transfer. An instance is created upon
* initialization of leadership transfer.
+ *
*
* The transfer process is as follows:
*
@@ -35,32 +37,48 @@ import scala.concurrent.duration.FiniteDuration;
* their local RoleChangeNotifiers.
* - Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
* instance. This allows derived classes to perform work prior to transferring leadership.
- * - When the pause is complete, the {@link #run} method is called which in turn calls
- * {@link Leader#transferLeadership}.
+ * - When the pause is complete, the run method is called which in turn calls
+ * {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.
* - The Leader calls {@link #transferComplete} on successful completion.
* - Wait a short period of time for the new leader to be elected to give the derived class a chance to
* possibly complete work that was suspended while we were transferring.
* - On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.
*
+ *
*
* NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
* internal state.
*
* @author Thomas Pantelis
*/
-public class RaftActorLeadershipTransferCohort implements Runnable {
+public class RaftActorLeadershipTransferCohort {
private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
- private final RaftActor raftActor;
- private final ActorRef replyTo;
- private Cancellable newLeaderTimer;
+ static final long USE_DEFAULT_LEADER_TIMEOUT = -1;
+
private final List onCompleteCallbacks = new ArrayList<>();
- private long newLeaderTimeoutInMillis = 2000;
private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+ private final RaftActor raftActor;
+ private final String requestedFollowerId;
+
+ private long newLeaderTimeoutInMillis = 2000;
+ private Cancellable newLeaderTimer;
+ private boolean isTransferring;
+
+ RaftActorLeadershipTransferCohort(final RaftActor raftActor) {
+ this(raftActor, null);
+ }
- RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
+ RaftActorLeadershipTransferCohort(final RaftActor raftActor, final @Nullable String requestedFollowerId) {
this.raftActor = raftActor;
- this.replyTo = replyTo;
+ this.requestedFollowerId = requestedFollowerId;
+
+ // We'll wait an election timeout period for a new leader to be elected plus some cushion to take into
+ // account the variance.
+ final long electionTimeout = raftActor.getRaftActorContext().getConfigParams()
+ .getElectionTimeOutInterval().toMillis();
+ final int variance = raftActor.getRaftActorContext().getConfigParams().getElectionTimeVariance();
+ newLeaderTimeoutInMillis = 2 * (electionTimeout + variance);
}
void init() {
@@ -70,30 +88,42 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
transferTimer.start();
Optional roleChangeNotifier = raftActor.getRoleChangeNotifier();
- if(roleChangeNotifier.isPresent()) {
+ if (roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
currentBehavior.getLeaderPayloadVersion()), raftActor.self());
}
- LeaderTransitioning leaderTransitioning = new LeaderTransitioning();
- for(String peerId: context.getPeerIds()) {
+ for (String peerId: context.getPeerIds()) {
ActorSelection followerActor = context.getPeerActorSelection(peerId);
- if(followerActor != null) {
- followerActor.tell(leaderTransitioning, context.getActor());
+ if (followerActor != null) {
+ followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
}
}
- raftActor.pauseLeader(this);
+ raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
+ @Override
+ protected void doRun() {
+ LOG.debug("{}: pauseLeader successfully completed - doing transfer", raftActor.persistenceId());
+ doTransfer();
+ }
+
+ @Override
+ protected void doCancel() {
+ LOG.debug("{}: pauseLeader timed out - continuing with transfer", raftActor.persistenceId());
+ doTransfer();
+ }
+ });
}
/**
- * This method is invoked to run the leadership transfer.
+ * This method is invoked to perform the leadership transfer.
*/
- @Override
- public void run() {
+ @VisibleForTesting
+ void doTransfer() {
RaftActorBehavior behavior = raftActor.getCurrentBehavior();
// Sanity check...
- if(behavior instanceof Leader) {
+ if (behavior instanceof Leader) {
+ isTransferring = true;
((Leader)behavior).transferLeadership(this);
} else {
LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
@@ -121,60 +151,67 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
// and convert to follower due to higher term. We should then get an AppendEntries heart
// beat with the new leader id.
- // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
- // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
- // safely run on actor's thread dispatcher.
+ // Add a timer in case we don't get a leader change. Note: the Runnable is sent as a message to the raftActor
+ // which executes it safely run on the actor's thread dispatcher.
FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
- new Runnable() {
- @Override
- public void run() {
- LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
- finish(true);
- }
- }, raftActor.getContext().system().dispatcher(), raftActor.self());
+ (Runnable) () -> {
+ LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+ finish(true);
+ }, raftActor.getContext().system().dispatcher(), raftActor.self());
}
- void onNewLeader(String newLeader) {
- if(newLeader != null && newLeaderTimer != null) {
+ void onNewLeader(final String newLeader) {
+ if (newLeader != null && newLeaderTimer != null) {
LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
newLeaderTimer.cancel();
finish(true);
}
}
- private void finish(boolean success) {
- if(transferTimer.isRunning()) {
+ private void finish(final boolean success) {
+ isTransferring = false;
+ if (transferTimer.isRunning()) {
transferTimer.stop();
- if(success) {
+ if (success) {
LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
- raftActor.getLeaderId(), transferTimer.toString());
+ raftActor.getLeaderId(), transferTimer);
} else {
- LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
- transferTimer.toString());
+ LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer);
+ raftActor.unpauseLeader();
}
}
- for(OnComplete onComplete: onCompleteCallbacks) {
- if(success) {
- onComplete.onSuccess(raftActor.self(), replyTo);
+ for (OnComplete onComplete: onCompleteCallbacks) {
+ if (success) {
+ onComplete.onSuccess(raftActor.self());
} else {
- onComplete.onFailure(raftActor.self(), replyTo);
+ onComplete.onFailure(raftActor.self());
}
}
}
- void addOnComplete(OnComplete onComplete) {
+ void addOnComplete(final OnComplete onComplete) {
onCompleteCallbacks.add(onComplete);
}
- @VisibleForTesting
- void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
- this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+ boolean isTransferring() {
+ return isTransferring;
+ }
+
+ void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
+ if (newLeaderTimeoutInMillis != USE_DEFAULT_LEADER_TIMEOUT) {
+ this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+ }
+ }
+
+ public Optional getRequestedFollowerId() {
+ return Optional.ofNullable(requestedFollowerId);
}
interface OnComplete {
- void onSuccess(ActorRef raftActorRef, ActorRef replyTo);
- void onFailure(ActorRef raftActorRef, ActorRef replyTo);
+ void onSuccess(ActorRef raftActorRef);
+
+ void onFailure(ActorRef raftActorRef);
}
}