X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorLeadershipTransferCohort.java;h=59ae4d3069ba97085c3473b5581dbb608dde574f;hp=85980e2ca371fe752d1743cc0f3aab6df186628a;hb=0c05dff15e4f36c5ecbd26e82309de21f67c8cd5;hpb=26448d92f76ceb25424de4b33c6e251be3756812;ds=sidebyside diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index 85980e2ca3..59ae4d3069 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -16,6 +16,7 @@ import com.google.common.base.Stopwatch; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; @@ -26,6 +27,7 @@ import scala.concurrent.duration.FiniteDuration; /** * A raft actor support class that participates in leadership transfer. An instance is created upon * initialization of leadership transfer. + * *

* The transfer process is as follows: *

    @@ -35,13 +37,14 @@ import scala.concurrent.duration.FiniteDuration; * their local RoleChangeNotifiers. *
  1. Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort * instance. This allows derived classes to perform work prior to transferring leadership.
  2. - *
  3. When the pause is complete, the {@link #run} method is called which in turn calls - * {@link Leader#transferLeadership}.
  4. + *
  5. When the pause is complete, the run method is called which in turn calls + * {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.
  6. *
  7. The Leader calls {@link #transferComplete} on successful completion.
  8. *
  9. Wait a short period of time for the new leader to be elected to give the derived class a chance to * possibly complete work that was suspended while we were transferring.
  10. *
  11. On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.
  12. *
+ * *

* NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify * internal state. @@ -51,15 +54,22 @@ import scala.concurrent.duration.FiniteDuration; public class RaftActorLeadershipTransferCohort { private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class); - private final RaftActor raftActor; - private Cancellable newLeaderTimer; private final List onCompleteCallbacks = new ArrayList<>(); - private long newLeaderTimeoutInMillis = 2000; private final Stopwatch transferTimer = Stopwatch.createUnstarted(); + private final RaftActor raftActor; + private final String requestedFollowerId; + + private long newLeaderTimeoutInMillis = 2000; + private Cancellable newLeaderTimer; private boolean isTransferring; - RaftActorLeadershipTransferCohort(RaftActor raftActor) { + RaftActorLeadershipTransferCohort(final RaftActor raftActor) { + this(raftActor, null); + } + + RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) { this.raftActor = raftActor; + this.requestedFollowerId = requestedFollowerId; } void init() { @@ -69,15 +79,15 @@ public class RaftActorLeadershipTransferCohort { transferTimer.start(); Optional roleChangeNotifier = raftActor.getRoleChangeNotifier(); - if(roleChangeNotifier.isPresent()) { + if (roleChangeNotifier.isPresent()) { roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null, currentBehavior.getLeaderPayloadVersion()), raftActor.self()); } - for(String peerId: context.getPeerIds()) { + for (String peerId: context.getPeerIds()) { ActorSelection followerActor = context.getPeerActorSelection(peerId); - if(followerActor != null) { - followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor()); + if (followerActor != null) { + followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor()); } } @@ -102,7 +112,7 @@ public class RaftActorLeadershipTransferCohort { void doTransfer() { RaftActorBehavior behavior = raftActor.getCurrentBehavior(); // Sanity check... - if(behavior instanceof Leader) { + if (behavior instanceof Leader) { isTransferring = true; ((Leader)behavior).transferLeadership(this); } else { @@ -136,28 +146,25 @@ public class RaftActorLeadershipTransferCohort { // safely run on the actor's thread dispatcher. FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS); newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(), - new Runnable() { - @Override - public void run() { - LOG.debug("{}: leader not elected in time", raftActor.persistenceId()); - finish(true); - } - }, raftActor.getContext().system().dispatcher(), raftActor.self()); + (Runnable) () -> { + LOG.debug("{}: leader not elected in time", raftActor.persistenceId()); + finish(true); + }, raftActor.getContext().system().dispatcher(), raftActor.self()); } - void onNewLeader(String newLeader) { - if(newLeader != null && newLeaderTimer != null) { + void onNewLeader(final String newLeader) { + if (newLeader != null && newLeaderTimer != null) { LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader); newLeaderTimer.cancel(); finish(true); } } - private void finish(boolean success) { + private void finish(final boolean success) { isTransferring = false; - if(transferTimer.isRunning()) { + if (transferTimer.isRunning()) { transferTimer.stop(); - if(success) { + if (success) { LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(), raftActor.getLeaderId(), transferTimer); } else { @@ -165,8 +172,8 @@ public class RaftActorLeadershipTransferCohort { } } - for(OnComplete onComplete: onCompleteCallbacks) { - if(success) { + for (OnComplete onComplete: onCompleteCallbacks) { + if (success) { onComplete.onSuccess(raftActor.self()); } else { onComplete.onFailure(raftActor.self()); @@ -174,7 +181,7 @@ public class RaftActorLeadershipTransferCohort { } } - void addOnComplete(OnComplete onComplete) { + void addOnComplete(final OnComplete onComplete) { onCompleteCallbacks.add(onComplete); } @@ -183,12 +190,17 @@ public class RaftActorLeadershipTransferCohort { } @VisibleForTesting - void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) { + void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) { this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis; } + public Optional getRequestedFollowerId() { + return Optional.fromNullable(requestedFollowerId); + } + interface OnComplete { void onSuccess(ActorRef raftActorRef); + void onFailure(ActorRef raftActorRef); } }