Transfer leadership in PreLeader state
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 70a0b86952a3ea22e04921860ae7a2bd5ef97dc1..7ec9f2743c7b6472af4e6ccf521e7c5b4f030e02 100644 (file)
@@ -121,8 +121,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private RaftActorServerConfigurationSupport serverConfigurationSupport;
 
-    private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
     private boolean shuttingDown;
 
     protected RaftActor(String id, Map<String, String> peerAddresses,
@@ -309,7 +307,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                                         + ". Follower is not ready to become leader")),
                         getSelf());
             }
-        }, message.getRequestedFollowerId());
+        }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
     }
 
     private boolean possiblyHandleBehaviorMessage(final Object message) {
@@ -328,30 +326,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return false;
     }
 
-    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
-        initiateLeadershipTransfer(onComplete, null);
-    }
-
     private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
-                                            final String followerId) {
+            @Nullable final String followerId, long newLeaderTimeoutInMillis) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         if (leadershipTransferInProgress == null) {
             leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+            leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
                 public void onSuccess(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
 
                 @Override
                 public void onFailure(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
             });
 
             leadershipTransferInProgress.addOnComplete(onComplete);
+
+            context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
             leadershipTransferInProgress.init();
+
         } else {
             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
             leadershipTransferInProgress.addOnComplete(onComplete);
@@ -368,10 +367,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         shuttingDown = true;
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        if (currentBehavior.state() != RaftState.Leader) {
-            // For non-leaders shutdown is a no-op
-            self().tell(PoisonPill.getInstance(), self());
-            return;
+        switch (currentBehavior.state()) {
+            case Leader:
+            case PreLeader:
+                // Fall-through to more work
+                break;
+            default:
+                // For non-leaders shutdown is a no-op
+                self().tell(PoisonPill.getInstance(), self());
+                return;
         }
 
         if (context.hasFollowers()) {
@@ -387,7 +391,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
-            });
+            }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
         } else {
             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
                 @Override
@@ -517,6 +521,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
 
+            RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+                    context.getRaftActorLeadershipTransferCohort();
             if (leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
@@ -654,7 +660,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 && !shuttingDown && !isLeadershipTransferInProgress();
     }
 
-    private boolean isLeadershipTransferInProgress() {
+    protected boolean isLeadershipTransferInProgress() {
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
@@ -908,7 +915,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         initializeBehavior();
                     }
                 }
-            });
+            }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
         }
     }