Bug 7391: Fix out-of-order LeaderStateChange events
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorLeadershipTransferCohort.java
index b83bfd370948dc18a3826633568e2a78eeb650ac..32790d0f47251af840193ebfe77fa51c4702ffd1 100644 (file)
@@ -26,6 +26,7 @@ import scala.concurrent.duration.FiniteDuration;
 /**
  * A raft actor support class that participates in leadership transfer. An instance is created upon
  * initialization of leadership transfer.
+ *
  * <p>
  * The transfer process is as follows:
  * <ol>
@@ -35,13 +36,14 @@ import scala.concurrent.duration.FiniteDuration;
  *     their local RoleChangeNotifiers.</li>
  * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
  *     instance. This allows derived classes to perform work prior to transferring leadership.</li>
- * <li>When the pause is complete, the {@link #run} method is called which in turn calls
- *     {@link Leader#transferLeadership}.</li>
+ * <li>When the pause is complete, the run method is called which in turn calls
+ *     {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.</li>
  * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
  * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
  *     possibly complete work that was suspended while we were transferring.</li>
  * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
  * </ol>
+ *
  * <p>
  * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
  * internal state.
@@ -52,16 +54,14 @@ public class RaftActorLeadershipTransferCohort {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
 
     private final RaftActor raftActor;
-    private final ActorRef replyTo;
     private Cancellable newLeaderTimer;
     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
     private long newLeaderTimeoutInMillis = 2000;
     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
     private boolean isTransferring;
 
-    RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
+    RaftActorLeadershipTransferCohort(RaftActor raftActor) {
         this.raftActor = raftActor;
-        this.replyTo = replyTo;
     }
 
     void init() {
@@ -71,16 +71,15 @@ public class RaftActorLeadershipTransferCohort {
         transferTimer.start();
 
         Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
-        if(roleChangeNotifier.isPresent()) {
+        if (roleChangeNotifier.isPresent()) {
             roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
                     currentBehavior.getLeaderPayloadVersion()), raftActor.self());
         }
 
-        LeaderTransitioning leaderTransitioning = new LeaderTransitioning();
-        for(String peerId: context.getPeerIds()) {
+        for (String peerId: context.getPeerIds()) {
             ActorSelection followerActor = context.getPeerActorSelection(peerId);
-            if(followerActor != null) {
-                followerActor.tell(leaderTransitioning, context.getActor());
+            if (followerActor != null) {
+                followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
             }
         }
 
@@ -105,7 +104,7 @@ public class RaftActorLeadershipTransferCohort {
     void doTransfer() {
         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
         // Sanity check...
-        if(behavior instanceof Leader) {
+        if (behavior instanceof Leader) {
             isTransferring = true;
             ((Leader)behavior).transferLeadership(this);
         } else {
@@ -139,17 +138,14 @@ public class RaftActorLeadershipTransferCohort {
         // safely run on the actor's thread dispatcher.
         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
-                new Runnable() {
-                    @Override
-                    public void run() {
-                        LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
-                        finish(true);
-                    }
-                }, raftActor.getContext().system().dispatcher(), raftActor.self());
+            (Runnable) () -> {
+                LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+                finish(true);
+            }, raftActor.getContext().system().dispatcher(), raftActor.self());
     }
 
     void onNewLeader(String newLeader) {
-        if(newLeader != null && newLeaderTimer != null) {
+        if (newLeader != null && newLeaderTimer != null) {
             LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
             newLeaderTimer.cancel();
             finish(true);
@@ -158,22 +154,21 @@ public class RaftActorLeadershipTransferCohort {
 
     private void finish(boolean success) {
         isTransferring = false;
-        if(transferTimer.isRunning()) {
+        if (transferTimer.isRunning()) {
             transferTimer.stop();
-            if(success) {
+            if (success) {
                 LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
-                        raftActor.getLeaderId(), transferTimer.toString());
+                        raftActor.getLeaderId(), transferTimer);
             } else {
-                LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
-                        transferTimer.toString());
+                LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer);
             }
         }
 
-        for(OnComplete onComplete: onCompleteCallbacks) {
-            if(success) {
-                onComplete.onSuccess(raftActor.self(), replyTo);
+        for (OnComplete onComplete: onCompleteCallbacks) {
+            if (success) {
+                onComplete.onSuccess(raftActor.self());
             } else {
-                onComplete.onFailure(raftActor.self(), replyTo);
+                onComplete.onFailure(raftActor.self());
             }
         }
     }
@@ -192,7 +187,8 @@ public class RaftActorLeadershipTransferCohort {
     }
 
     interface OnComplete {
-        void onSuccess(ActorRef raftActorRef, ActorRef replyTo);
-        void onFailure(ActorRef raftActorRef, ActorRef replyTo);
+        void onSuccess(ActorRef raftActorRef);
+
+        void onFailure(ActorRef raftActorRef);
     }
 }