Bug 8606: Continue leadership transfer on pauseLeader timeout
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorLeadershipTransferCohort.java
index b087914deccf23837137ba36847d269b4bcfc7aa..0a22abb9657ff5813ce27a34736e78e5b7e3bc3a 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -53,15 +54,31 @@ import scala.concurrent.duration.FiniteDuration;
 public class RaftActorLeadershipTransferCohort {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
 
-    private final RaftActor raftActor;
-    private Cancellable newLeaderTimer;
+    static final long USE_DEFAULT_LEADER_TIMEOUT = -1;
+
     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
-    private long newLeaderTimeoutInMillis = 2000;
     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+    private final RaftActor raftActor;
+    private final String requestedFollowerId;
+
+    private long newLeaderTimeoutInMillis = 2000;
+    private Cancellable newLeaderTimer;
     private boolean isTransferring;
 
-    RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+    RaftActorLeadershipTransferCohort(final RaftActor raftActor) {
+        this(raftActor, null);
+    }
+
+    RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) {
         this.raftActor = raftActor;
+        this.requestedFollowerId = requestedFollowerId;
+
+        // We'll wait an election timeout period for a new leader to be elected plus some cushion to take into
+        // account the variance.
+        final long electionTimeout = raftActor.getRaftActorContext().getConfigParams()
+                .getElectionTimeOutInterval().toMillis();
+        final int variance = raftActor.getRaftActorContext().getConfigParams().getElectionTimeVariance();
+        newLeaderTimeoutInMillis = 2 * (electionTimeout + variance);
     }
 
     void init() {
@@ -79,20 +96,21 @@ public class RaftActorLeadershipTransferCohort {
         for (String peerId: context.getPeerIds()) {
             ActorSelection followerActor = context.getPeerActorSelection(peerId);
             if (followerActor != null) {
-                followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor());
+                followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
             }
         }
 
         raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
             @Override
             protected void doRun() {
+                LOG.debug("{}: pauseLeader successfully completed - doing transfer", raftActor.persistenceId());
                 doTransfer();
             }
 
             @Override
             protected void doCancel() {
-                LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId());
-                abortTransfer();
+                LOG.debug("{}: pauseLeader timed out - continuing with transfer", raftActor.persistenceId());
+                doTransfer();
             }
         });
     }
@@ -133,9 +151,8 @@ public class RaftActorLeadershipTransferCohort {
         // and convert to follower due to higher term. We should then get an AppendEntries heart
         // beat with the new leader id.
 
-        // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
-        // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
-        // safely run on the actor's thread dispatcher.
+        // Add a timer in case we don't get a leader change. Note: the Runnable is sent as a message to the raftActor
+        // which executes it safely run on the actor's thread dispatcher.
         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
             (Runnable) () -> {
@@ -144,7 +161,7 @@ public class RaftActorLeadershipTransferCohort {
             }, raftActor.getContext().system().dispatcher(), raftActor.self());
     }
 
-    void onNewLeader(String newLeader) {
+    void onNewLeader(final String newLeader) {
         if (newLeader != null && newLeaderTimer != null) {
             LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
             newLeaderTimer.cancel();
@@ -152,7 +169,7 @@ public class RaftActorLeadershipTransferCohort {
         }
     }
 
-    private void finish(boolean success) {
+    private void finish(final boolean success) {
         isTransferring = false;
         if (transferTimer.isRunning()) {
             transferTimer.stop();
@@ -173,7 +190,7 @@ public class RaftActorLeadershipTransferCohort {
         }
     }
 
-    void addOnComplete(OnComplete onComplete) {
+    void addOnComplete(final OnComplete onComplete) {
         onCompleteCallbacks.add(onComplete);
     }
 
@@ -181,9 +198,14 @@ public class RaftActorLeadershipTransferCohort {
         return isTransferring;
     }
 
-    @VisibleForTesting
-    void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
-        this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+    void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
+        if (newLeaderTimeoutInMillis != USE_DEFAULT_LEADER_TIMEOUT) {
+            this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+        }
+    }
+
+    public Optional<String> getRequestedFollowerId() {
+        return Optional.fromNullable(requestedFollowerId);
     }
 
     interface OnComplete {