Transfer leadership in PreLeader state
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 1c057d755369d50a2147cf7c3e2922783ae56fc8..7ec9f2743c7b6472af4e6ccf521e7c5b4f030e02 100644 (file)
@@ -12,6 +12,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.PoisonPill;
+import akka.actor.Status;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -49,6 +50,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
@@ -119,8 +121,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private RaftActorServerConfigurationSupport serverConfigurationSupport;
 
-    private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
     private boolean shuttingDown;
 
     protected RaftActor(String id, Map<String, String> peerAddresses,
@@ -263,12 +263,53 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         } else if (message instanceof Runnable) {
             ((Runnable)message).run();
         } else if (message instanceof NoopPayload) {
-            persistData(null, null, (NoopPayload)message, false);
+            persistData(null, null, (NoopPayload) message, false);
+        } else if (message instanceof RequestLeadership) {
+            onRequestLeadership((RequestLeadership) message);
         } else if (!possiblyHandleBehaviorMessage(message)) {
             handleNonRaftCommand(message);
         }
     }
 
+    private void onRequestLeadership(final RequestLeadership message) {
+        LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+        if (!isLeader()) {
+            // non-leader cannot satisfy leadership request
+            LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+                    + " Current behavior: {}. Sending failure response",
+                    persistenceId(), getCurrentBehavior().state());
+            message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+                    + message.getRequestedFollowerId()
+                    + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+            return;
+        }
+
+        final String requestedFollowerId = message.getRequestedFollowerId();
+        final ActorRef replyTo = message.getReplyTo();
+        initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+            @Override
+            public void onSuccess(final ActorRef raftActorRef) {
+                // sanity check
+                if (!requestedFollowerId.equals(getLeaderId())) {
+                    onFailure(raftActorRef);
+                }
+
+                LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Success(null), getSelf());
+            }
+
+            @Override
+            public void onFailure(final ActorRef raftActorRef) {
+                LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+                replyTo.tell(new Status.Failure(
+                        new LeadershipTransferFailedException(
+                                "Failed to transfer leadership to " + requestedFollowerId
+                                        + ". Follower is not ready to become leader")),
+                        getSelf());
+            }
+        }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
+    }
+
     private boolean possiblyHandleBehaviorMessage(final Object message) {
         final RaftActorBehavior currentBehavior = getCurrentBehavior();
         final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
@@ -285,25 +326,31 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return false;
     }
 
-    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+            @Nullable final String followerId, long newLeaderTimeoutInMillis) {
         LOG.debug("{}: Initiating leader transfer", persistenceId());
 
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         if (leadershipTransferInProgress == null) {
-            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+            leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
             leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
                 @Override
                 public void onSuccess(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
 
                 @Override
                 public void onFailure(ActorRef raftActorRef) {
-                    leadershipTransferInProgress = null;
+                    context.setRaftActorLeadershipTransferCohort(null);
                 }
             });
 
             leadershipTransferInProgress.addOnComplete(onComplete);
+
+            context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
             leadershipTransferInProgress.init();
+
         } else {
             LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
             leadershipTransferInProgress.addOnComplete(onComplete);
@@ -320,10 +367,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         shuttingDown = true;
 
         final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
-        if (currentBehavior.state() != RaftState.Leader) {
-            // For non-leaders shutdown is a no-op
-            self().tell(PoisonPill.getInstance(), self());
-            return;
+        switch (currentBehavior.state()) {
+            case Leader:
+            case PreLeader:
+                // Fall-through to more work
+                break;
+            default:
+                // For non-leaders shutdown is a no-op
+                self().tell(PoisonPill.getInstance(), self());
+                return;
         }
 
         if (context.hasFollowers()) {
@@ -339,7 +391,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
                     raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
                 }
-            });
+            }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
         } else {
             pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
                 @Override
@@ -469,6 +521,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
 
+            RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+                    context.getRaftActorLeadershipTransferCohort();
             if (leadershipTransferInProgress != null) {
                 leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
             }
@@ -606,7 +660,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 && !shuttingDown && !isLeadershipTransferInProgress();
     }
 
-    private boolean isLeadershipTransferInProgress() {
+    protected boolean isLeadershipTransferInProgress() {
+        RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
         return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
     }
 
@@ -860,7 +915,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         initializeBehavior();
                     }
                 }
-            });
+            }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
         }
     }