Add ShutDown message to RaftActor to transfer leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 3afaad857204b75ebb1fe36ca9383d0fe9a0e76d..6851f6aab9ca4441ab79055c967d14c4c5335cc5 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -47,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,6 +125,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private RaftActorServerConfigurationSupport serverConfigurationSupport;
 
+    private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
+
+    private boolean shuttingDown;
+
     public RaftActor(String id, Map<String, String> peerAddresses,
          Optional<ConfigParams> configParams, short payloadVersion) {
 
@@ -253,11 +259,74 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             switchBehavior(((SwitchBehavior) message));
         } else if(message instanceof LeaderTransitioning) {
             onLeaderTransitioning();
+        } else if(message instanceof Shutdown) {
+            onShutDown();
+        } else if(message instanceof Runnable) {
+            ((Runnable)message).run();
         } else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
             switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
         }
     }
 
+    private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+        LOG.debug("{}: Initiating leader transfer", persistenceId());
+
+        if(leadershipTransferInProgress == null) {
+            leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender());
+            leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
+                @Override
+                public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+                    leadershipTransferInProgress = null;
+                }
+
+                @Override
+                public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+                    leadershipTransferInProgress = null;
+                }
+            });
+
+            leadershipTransferInProgress.addOnComplete(onComplete);
+            leadershipTransferInProgress.init();
+        } else {
+            LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
+            leadershipTransferInProgress.addOnComplete(onComplete);
+        }
+    }
+
+    private void onShutDown() {
+        LOG.debug("{}: onShutDown", persistenceId());
+
+        if(shuttingDown) {
+            return;
+        }
+
+        shuttingDown = true;
+        if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
+            initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+                @Override
+                public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+                    LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
+                    raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+                }
+
+                @Override
+                public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+                    LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
+                    raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+                }
+            });
+        } else if(currentBehavior.state() == RaftState.Leader) {
+            pauseLeader(new Runnable() {
+                @Override
+                public void run() {
+                    self().tell(PoisonPill.getInstance(), self());
+                }
+            });
+        } else {
+            self().tell(PoisonPill.getInstance(), self());
+        }
+    }
+
     private void onLeaderTransitioning() {
         LOG.debug("{}: onLeaderTransitioning", persistenceId());
         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
@@ -361,6 +430,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             }
 
             onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
+
+            if(leadershipTransferInProgress != null) {
+                leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
+            }
         }
 
         if (roleChangeNotifier.isPresent() &&
@@ -458,6 +531,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return context.getId().equals(currentBehavior.getLeaderId());
     }
 
+    protected boolean isLeaderActive() {
+        return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown && leadershipTransferInProgress == null;
+    }
+
     /**
      * Derived actor can call getLeader if they need a reference to the Leader.
      * This would be useful for example in forwarding a request to an actor
@@ -638,6 +715,20 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     protected abstract Optional<ActorRef> getRoleChangeNotifier();
 
+    /**
+     * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
+     * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
+     * work prior to performing the operation. On completion of any work, the run method must be called to
+     * proceed with the given operation.
+     * <p>
+     * The default implementation immediately runs the operation.
+     *
+     * @param operation the operation to run
+     */
+    protected void pauseLeader(Runnable operation) {
+        operation.run();
+    }
+
     protected void onLeaderChanged(String oldLeader, String newLeader){};
 
     private String getLeaderAddress(){