X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorLeadershipTransferCohort.java;h=32790d0f47251af840193ebfe77fa51c4702ffd1;hp=77678e9f2423a2e8956e33a2483618d3b0d1d4dd;hb=000960f6451af770f5463e41e1fb6defb6f3ab27;hpb=db8fed63e18fccd2721fa7e189b2278a4f240f2c diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index 77678e9f24..32790d0f47 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -7,43 +7,188 @@ */ package org.opendaylight.controller.cluster.raft; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.Cancellable; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** - * A helper class that participates in raft actor leadership transfer. An instance is created upon + * A raft actor support class that participates in leadership transfer. An instance is created upon * initialization of leadership transfer. + * + *

+ * The transfer process is as follows: + *

    + *
  1. Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify + * clients that we no longer have a working leader.
  2. + *
  3. Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to + * their local RoleChangeNotifiers.
  4. + *
  5. Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort + * instance. This allows derived classes to perform work prior to transferring leadership.
  6. + *
  7. When the pause is complete, the run method is called which in turn calls + * {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.
  8. + *
  9. The Leader calls {@link #transferComplete} on successful completion.
  10. + *
  11. Wait a short period of time for the new leader to be elected to give the derived class a chance to + * possibly complete work that was suspended while we were transferring.
  12. + *
  13. On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.
  14. + *
+ * *

- * NOTE: All methods on this class must be called on the actor's thread dispatcher as they modify internal state. + * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify + * internal state. * * @author Thomas Pantelis */ -public abstract class RaftActorLeadershipTransferCohort { +public class RaftActorLeadershipTransferCohort { + private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class); + private final RaftActor raftActor; + private Cancellable newLeaderTimer; + private final List onCompleteCallbacks = new ArrayList<>(); + private long newLeaderTimeoutInMillis = 2000; + private final Stopwatch transferTimer = Stopwatch.createUnstarted(); + private boolean isTransferring; - protected RaftActorLeadershipTransferCohort(RaftActor raftActor) { + RaftActorLeadershipTransferCohort(RaftActor raftActor) { this.raftActor = raftActor; } + void init() { + RaftActorContext context = raftActor.getRaftActorContext(); + RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior(); + + transferTimer.start(); + + Optional roleChangeNotifier = raftActor.getRoleChangeNotifier(); + if (roleChangeNotifier.isPresent()) { + roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null, + currentBehavior.getLeaderPayloadVersion()), raftActor.self()); + } + + for (String peerId: context.getPeerIds()) { + ActorSelection followerActor = context.getPeerActorSelection(peerId); + if (followerActor != null) { + followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor()); + } + } + + raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) { + @Override + protected void doRun() { + doTransfer(); + } + + @Override + protected void doCancel() { + LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId()); + abortTransfer(); + } + }); + } + /** - * This method is invoked to start leadership transfer. + * This method is invoked to perform the leadership transfer. */ - public void startTransfer() { + @VisibleForTesting + void doTransfer() { RaftActorBehavior behavior = raftActor.getCurrentBehavior(); - if(behavior instanceof Leader) { + // Sanity check... + if (behavior instanceof Leader) { + isTransferring = true; ((Leader)behavior).transferLeadership(this); + } else { + LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId()); + finish(true); } } /** - * This method is invoked to abort leadership transfer. + * This method is invoked to abort leadership transfer on failure. */ public void abortTransfer() { - transferComplete(); + LOG.debug("{}: leader transfer aborted", raftActor.persistenceId()); + finish(false); } /** - * This method is invoked when leadership transfer is complete. + * This method is invoked when leadership transfer was carried out and complete. */ - public abstract void transferComplete(); + public void transferComplete() { + LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId()); + + // We'll give it a little time for the new leader to be elected to give the derived class a + // chance to possibly complete work that was suspended while we were transferring. The + // RequestVote message from the new leader candidate should cause us to step down as leader + // and convert to follower due to higher term. We should then get an AppendEntries heart + // beat with the new leader id. + + // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new + // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it + // safely run on the actor's thread dispatcher. + FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS); + newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(), + (Runnable) () -> { + LOG.debug("{}: leader not elected in time", raftActor.persistenceId()); + finish(true); + }, raftActor.getContext().system().dispatcher(), raftActor.self()); + } + + void onNewLeader(String newLeader) { + if (newLeader != null && newLeaderTimer != null) { + LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader); + newLeaderTimer.cancel(); + finish(true); + } + } + + private void finish(boolean success) { + isTransferring = false; + if (transferTimer.isRunning()) { + transferTimer.stop(); + if (success) { + LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(), + raftActor.getLeaderId(), transferTimer); + } else { + LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer); + } + } + + for (OnComplete onComplete: onCompleteCallbacks) { + if (success) { + onComplete.onSuccess(raftActor.self()); + } else { + onComplete.onFailure(raftActor.self()); + } + } + } + + void addOnComplete(OnComplete onComplete) { + onCompleteCallbacks.add(onComplete); + } + + boolean isTransferring() { + return isTransferring; + } + + @VisibleForTesting + void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) { + this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis; + } + + interface OnComplete { + void onSuccess(ActorRef raftActorRef); + + void onFailure(ActorRef raftActorRef); + } }