Bug 7407 - Add request leadership functionality to shards
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorLeadershipTransferCohort.java
index f3ffe139471b3dd034047fb461bd896eb8c861b1..59ae4d3069ba97085c3473b5581dbb608dde574f 100644 (file)
@@ -16,6 +16,7 @@ import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -26,7 +27,8 @@ import scala.concurrent.duration.FiniteDuration;
 /**
  * A raft actor support class that participates in leadership transfer. An instance is created upon
  * initialization of leadership transfer.
- * <p/>
+ *
+ * <p>
  * The transfer process is as follows:
  * <ol>
  * <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
@@ -35,14 +37,15 @@ import scala.concurrent.duration.FiniteDuration;
  *     their local RoleChangeNotifiers.</li>
  * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
  *     instance. This allows derived classes to perform work prior to transferring leadership.</li>
- * <li>When the pause is complete, the {@link #run} method is called which in turn calls
- *     {@link Leader#transferLeadership}.</li>
+ * <li>When the pause is complete, the run method is called which in turn calls
+ *     {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.</li>
  * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
  * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
  *     possibly complete work that was suspended while we were transferring.</li>
  * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
  * </ol>
- * <p/>
+ *
+ * <p>
  * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
  * internal state.
  *
@@ -51,15 +54,22 @@ import scala.concurrent.duration.FiniteDuration;
 public class RaftActorLeadershipTransferCohort {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
 
-    private final RaftActor raftActor;
-    private Cancellable newLeaderTimer;
     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
-    private long newLeaderTimeoutInMillis = 2000;
     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+    private final RaftActor raftActor;
+    private final String requestedFollowerId;
+
+    private long newLeaderTimeoutInMillis = 2000;
+    private Cancellable newLeaderTimer;
     private boolean isTransferring;
 
-    RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+    RaftActorLeadershipTransferCohort(final RaftActor raftActor) {
+        this(raftActor, null);
+    }
+
+    RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) {
         this.raftActor = raftActor;
+        this.requestedFollowerId = requestedFollowerId;
     }
 
     void init() {
@@ -77,7 +87,7 @@ public class RaftActorLeadershipTransferCohort {
         for (String peerId: context.getPeerIds()) {
             ActorSelection followerActor = context.getPeerActorSelection(peerId);
             if (followerActor != null) {
-                followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor());
+                followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
             }
         }
 
@@ -142,7 +152,7 @@ public class RaftActorLeadershipTransferCohort {
             }, raftActor.getContext().system().dispatcher(), raftActor.self());
     }
 
-    void onNewLeader(String newLeader) {
+    void onNewLeader(final String newLeader) {
         if (newLeader != null && newLeaderTimer != null) {
             LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
             newLeaderTimer.cancel();
@@ -150,7 +160,7 @@ public class RaftActorLeadershipTransferCohort {
         }
     }
 
-    private void finish(boolean success) {
+    private void finish(final boolean success) {
         isTransferring = false;
         if (transferTimer.isRunning()) {
             transferTimer.stop();
@@ -171,7 +181,7 @@ public class RaftActorLeadershipTransferCohort {
         }
     }
 
-    void addOnComplete(OnComplete onComplete) {
+    void addOnComplete(final OnComplete onComplete) {
         onCompleteCallbacks.add(onComplete);
     }
 
@@ -180,10 +190,14 @@ public class RaftActorLeadershipTransferCohort {
     }
 
     @VisibleForTesting
-    void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
+    void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
         this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
     }
 
+    public Optional<String> getRequestedFollowerId() {
+        return Optional.fromNullable(requestedFollowerId);
+    }
+
     interface OnComplete {
         void onSuccess(ActorRef raftActorRef);