import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.PoisonPill;
+import akka.actor.Status;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.NoopPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
} else if (message instanceof Runnable) {
((Runnable)message).run();
} else if (message instanceof NoopPayload) {
- persistData(null, null, (NoopPayload)message, false);
+ persistData(null, null, (NoopPayload) message, false);
+ } else if (message instanceof RequestLeadership) {
+ onRequestLeadership((RequestLeadership) message);
} else if (!possiblyHandleBehaviorMessage(message)) {
handleNonRaftCommand(message);
}
}
+ private void onRequestLeadership(final RequestLeadership message) {
+ LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+ if (!isLeader()) {
+ // non-leader cannot satisfy leadership request
+ LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+ + " Current behavior: {}. Sending failure response",
+ persistenceId(), getCurrentBehavior().state());
+ message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+ + message.getRequestedFollowerId()
+ + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+ return;
+ }
+
+ final String requestedFollowerId = message.getRequestedFollowerId();
+ final ActorRef replyTo = message.getReplyTo();
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(final ActorRef raftActorRef) {
+ // sanity check
+ if (!requestedFollowerId.equals(getLeaderId())) {
+ onFailure(raftActorRef);
+ }
+
+ LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Success(null), getSelf());
+ }
+
+ @Override
+ public void onFailure(final ActorRef raftActorRef) {
+ LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Failure(
+ new LeadershipTransferFailedException(
+ "Failed to transfer leadership to " + requestedFollowerId
+ + ". Follower is not ready to become leader")),
+ getSelf());
+ }
+ }, message.getRequestedFollowerId());
+ }
+
private boolean possiblyHandleBehaviorMessage(final Object message) {
final RaftActorBehavior currentBehavior = getCurrentBehavior();
final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
}
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
+ initiateLeadershipTransfer(onComplete, null);
+ }
+
+ private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+ final String followerId) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
if (leadershipTransferInProgress == null) {
- leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
public void onSuccess(ActorRef raftActorRef) {