Bug 7391: Fix out-of-order LeaderStateChange events
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorLeadershipTransferCohort.java
index 77678e9f2423a2e8956e33a2483618d3b0d1d4dd..32790d0f47251af840193ebfe77fa51c4702ffd1 100644 (file)
  */
 package org.opendaylight.controller.cluster.raft;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.Cancellable;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
- * A helper class that participates in raft actor leadership transfer. An instance is created upon
+ * A raft actor support class that participates in leadership transfer. An instance is created upon
  * initialization of leadership transfer.
+ *
+ * <p>
+ * The transfer process is as follows:
+ * <ol>
+ * <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
+ *     clients that we no longer have a working leader.</li>
+ * <li>Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to
+ *     their local RoleChangeNotifiers.</li>
+ * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
+ *     instance. This allows derived classes to perform work prior to transferring leadership.</li>
+ * <li>When the pause is complete, the run method is called which in turn calls
+ *     {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.</li>
+ * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
+ * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
+ *     possibly complete work that was suspended while we were transferring.</li>
+ * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
+ * </ol>
+ *
  * <p>
- * NOTE: All methods on this class must be called on the actor's thread dispatcher as they modify internal state.
+ * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
+ * internal state.
  *
  * @author Thomas Pantelis
  */
-public abstract class RaftActorLeadershipTransferCohort {
+public class RaftActorLeadershipTransferCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
+
     private final RaftActor raftActor;
+    private Cancellable newLeaderTimer;
+    private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
+    private long newLeaderTimeoutInMillis = 2000;
+    private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+    private boolean isTransferring;
 
-    protected RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+    RaftActorLeadershipTransferCohort(RaftActor raftActor) {
         this.raftActor = raftActor;
     }
 
+    void init() {
+        RaftActorContext context = raftActor.getRaftActorContext();
+        RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior();
+
+        transferTimer.start();
+
+        Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
+        if (roleChangeNotifier.isPresent()) {
+            roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
+                    currentBehavior.getLeaderPayloadVersion()), raftActor.self());
+        }
+
+        for (String peerId: context.getPeerIds()) {
+            ActorSelection followerActor = context.getPeerActorSelection(peerId);
+            if (followerActor != null) {
+                followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
+            }
+        }
+
+        raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
+            @Override
+            protected void doRun() {
+                doTransfer();
+            }
+
+            @Override
+            protected void doCancel() {
+                LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId());
+                abortTransfer();
+            }
+        });
+    }
+
     /**
-     * This method is invoked to start leadership transfer.
+     * This method is invoked to perform the leadership transfer.
      */
-    public void startTransfer() {
+    @VisibleForTesting
+    void doTransfer() {
         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
-        if(behavior instanceof Leader) {
+        // Sanity check...
+        if (behavior instanceof Leader) {
+            isTransferring = true;
             ((Leader)behavior).transferLeadership(this);
+        } else {
+            LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
+            finish(true);
         }
     }
 
     /**
-     * This method is invoked to abort leadership transfer.
+     * This method is invoked to abort leadership transfer on failure.
      */
     public void abortTransfer() {
-        transferComplete();
+        LOG.debug("{}: leader transfer aborted", raftActor.persistenceId());
+        finish(false);
     }
 
     /**
-     * This method is invoked when leadership transfer is complete.
+     * This method is invoked when leadership transfer was carried out and complete.
      */
-    public abstract void transferComplete();
+    public void transferComplete() {
+        LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
+
+        // We'll give it a little time for the new leader to be elected to give the derived class a
+        // chance to possibly complete work that was suspended while we were transferring. The
+        // RequestVote message from the new leader candidate should cause us to step down as leader
+        // and convert to follower due to higher term. We should then get an AppendEntries heart
+        // beat with the new leader id.
+
+        // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
+        // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
+        // safely run on the actor's thread dispatcher.
+        FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
+        newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
+            (Runnable) () -> {
+                LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+                finish(true);
+            }, raftActor.getContext().system().dispatcher(), raftActor.self());
+    }
+
+    void onNewLeader(String newLeader) {
+        if (newLeader != null && newLeaderTimer != null) {
+            LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
+            newLeaderTimer.cancel();
+            finish(true);
+        }
+    }
+
+    private void finish(boolean success) {
+        isTransferring = false;
+        if (transferTimer.isRunning()) {
+            transferTimer.stop();
+            if (success) {
+                LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
+                        raftActor.getLeaderId(), transferTimer);
+            } else {
+                LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer);
+            }
+        }
+
+        for (OnComplete onComplete: onCompleteCallbacks) {
+            if (success) {
+                onComplete.onSuccess(raftActor.self());
+            } else {
+                onComplete.onFailure(raftActor.self());
+            }
+        }
+    }
+
+    void addOnComplete(OnComplete onComplete) {
+        onCompleteCallbacks.add(onComplete);
+    }
+
+    boolean isTransferring() {
+        return isTransferring;
+    }
+
+    @VisibleForTesting
+    void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
+        this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+    }
+
+    interface OnComplete {
+        void onSuccess(ActorRef raftActorRef);
+
+        void onFailure(ActorRef raftActorRef);
+    }
 }