Turn LeaderTransitioning into a singleton
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorLeadershipTransferCohort.java
index 623fa4902ca6fc45ce5d86630ef620afde3b9c10..d5dfcf3943387149a179a731a6e51e04819c23cc 100644 (file)
@@ -48,7 +48,7 @@ import scala.concurrent.duration.FiniteDuration;
  *
  * @author Thomas Pantelis
  */
-public class RaftActorLeadershipTransferCohort implements Runnable {
+public class RaftActorLeadershipTransferCohort {
     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
 
     private final RaftActor raftActor;
@@ -57,6 +57,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
     private long newLeaderTimeoutInMillis = 2000;
     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+    private boolean isTransferring;
 
     RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
         this.raftActor = raftActor;
@@ -75,25 +76,36 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
                     currentBehavior.getLeaderPayloadVersion()), raftActor.self());
         }
 
-        LeaderTransitioning leaderTransitioning = new LeaderTransitioning();
         for(String peerId: context.getPeerIds()) {
             ActorSelection followerActor = context.getPeerActorSelection(peerId);
             if(followerActor != null) {
-                followerActor.tell(leaderTransitioning, context.getActor());
+                followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor());
             }
         }
 
-        raftActor.pauseLeader(this);
+        raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
+            @Override
+            protected void doRun() {
+                doTransfer();
+            }
+
+            @Override
+            protected void doCancel() {
+                LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId());
+                abortTransfer();
+            }
+        });
     }
 
     /**
-     * This method is invoked to run the leadership transfer.
+     * This method is invoked to perform the leadership transfer.
      */
-    @Override
-    public void run() {
+    @VisibleForTesting
+    void doTransfer() {
         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
         // Sanity check...
         if(behavior instanceof Leader) {
+            isTransferring = true;
             ((Leader)behavior).transferLeadership(this);
         } else {
             LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
@@ -123,7 +135,7 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
 
         // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
         // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
-        // safely run on actor's thread dispatcher.
+        // safely run on the actor's thread dispatcher.
         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
                 new Runnable() {
@@ -144,13 +156,14 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
     }
 
     private void finish(boolean success) {
+        isTransferring = false;
         if(transferTimer.isRunning()) {
             transferTimer.stop();
             if(success) {
                 LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
                         raftActor.getLeaderId(), transferTimer.toString());
             } else {
-                LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
+                LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
                         transferTimer.toString());
             }
         }
@@ -168,6 +181,10 @@ public class RaftActorLeadershipTransferCohort implements Runnable {
         onCompleteCallbacks.add(onComplete);
     }
 
+    boolean isTransferring() {
+        return isTransferring;
+    }
+
     @VisibleForTesting
     void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
         this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;