import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private RaftActorServerConfigurationSupport serverConfigurationSupport;
+ private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
+
+ private boolean shuttingDown;
+
public RaftActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams, short payloadVersion) {
switchBehavior(((SwitchBehavior) message));
} else if(message instanceof LeaderTransitioning) {
onLeaderTransitioning();
+ } else if(message instanceof Shutdown) {
+ onShutDown();
+ } else if(message instanceof Runnable) {
+ ((Runnable)message).run();
} else if(!snapshotSupport.handleSnapshotMessage(message, getSender())) {
switchBehavior(reusableSwitchBehaviorSupplier.handleMessage(getSender(), message));
}
}
+ private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+ LOG.debug("{}: Initiating leader transfer", persistenceId());
+
+ if(leadershipTransferInProgress == null) {
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender());
+ leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ leadershipTransferInProgress = null;
+ }
+
+ @Override
+ public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ leadershipTransferInProgress = null;
+ }
+ });
+
+ leadershipTransferInProgress.addOnComplete(onComplete);
+ leadershipTransferInProgress.init();
+ } else {
+ LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
+ leadershipTransferInProgress.addOnComplete(onComplete);
+ }
+ }
+
+ private void onShutDown() {
+ LOG.debug("{}: onShutDown", persistenceId());
+
+ if(shuttingDown) {
+ return;
+ }
+
+ shuttingDown = true;
+ if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
+ raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+ }
+
+ @Override
+ public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
+ raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+ }
+ });
+ } else if(currentBehavior.state() == RaftState.Leader) {
+ pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
+ @Override
+ protected void doRun() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+
+ @Override
+ protected void doCancel() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+ });
+ } else {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+ }
+
private void onLeaderTransitioning() {
LOG.debug("{}: onLeaderTransitioning", persistenceId());
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
}
onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
+
+ if(leadershipTransferInProgress != null) {
+ leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
+ }
}
if (roleChangeNotifier.isPresent() &&
return context.getId().equals(currentBehavior.getLeaderId());
}
+ protected boolean isLeaderActive() {
+ return currentBehavior.state() != RaftState.IsolatedLeader && !shuttingDown &&
+ !isLeadershipTransferInProgress();
+ }
+
+ private boolean isLeadershipTransferInProgress() {
+ return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
+ }
+
/**
* Derived actor can call getLeader if they need a reference to the Leader.
* This would be useful for example in forwarding a request to an actor
*/
protected abstract Optional<ActorRef> getRoleChangeNotifier();
+ /**
+ * This method is called prior to operations such as leadership transfer and actor shutdown when the leader
+ * must pause or stop its duties. This method allows derived classes to gracefully pause or finish current
+ * work prior to performing the operation. On completion of any work, the run method must be called on the
+ * given Runnable to proceed with the given operation. <b>Important:</b> the run method must be called on
+ * this actor's thread dispatcher as as it modifies internal state.
+ * <p>
+ * The default implementation immediately runs the operation.
+ *
+ * @param operation the operation to run
+ */
+ protected void pauseLeader(Runnable operation) {
+ operation.run();
+ }
+
protected void onLeaderChanged(String oldLeader, String newLeader){};
private String getLeaderAddress(){