+ } else if (message instanceof SwitchBehavior) {
+ switchBehavior((SwitchBehavior) message);
+ } else if (message instanceof LeaderTransitioning) {
+ onLeaderTransitioning((LeaderTransitioning)message);
+ } else if (message instanceof Shutdown) {
+ onShutDown();
+ } else if (message instanceof Runnable) {
+ ((Runnable)message).run();
+ } else if (message instanceof NoopPayload) {
+ persistData(null, null, (NoopPayload) message, false);
+ } else if (message instanceof RequestLeadership) {
+ onRequestLeadership((RequestLeadership) message);
+ } else if (!possiblyHandleBehaviorMessage(message)) {
+ handleNonRaftCommand(message);
+ }
+ }
+
+ private void onRequestLeadership(final RequestLeadership message) {
+ LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+ if (!isLeader()) {
+ // non-leader cannot satisfy leadership request
+ LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+ + " Current behavior: {}. Sending failure response",
+ persistenceId(), getCurrentBehavior().state());
+ message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
+ + message.getRequestedFollowerId()
+ + ". RequestLeadership message was sent to non-leader " + persistenceId()), getSelf());
+ return;
+ }
+
+ final String requestedFollowerId = message.getRequestedFollowerId();
+ final ActorRef replyTo = message.getReplyTo();
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(final ActorRef raftActorRef) {
+ // sanity check
+ if (!requestedFollowerId.equals(getLeaderId())) {
+ onFailure(raftActorRef);
+ }
+
+ LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Success(null), getSelf());
+ }
+
+ @Override
+ public void onFailure(final ActorRef raftActorRef) {
+ LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+ replyTo.tell(new Status.Failure(
+ new LeadershipTransferFailedException(
+ "Failed to transfer leadership to " + requestedFollowerId
+ + ". Follower is not ready to become leader")),
+ getSelf());
+ }
+ }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
+ }
+
+ private boolean possiblyHandleBehaviorMessage(final Object message) {
+ final RaftActorBehavior currentBehavior = getCurrentBehavior();
+ final BehaviorState state = behaviorStateTracker.capture(currentBehavior);
+
+ // A behavior indicates that it processed the change by returning a reference to the next behavior
+ // to be used. A null return indicates it has not processed the message and we should be passing it to
+ // the subclass for handling.
+ final RaftActorBehavior nextBehavior = currentBehavior.handleMessage(getSender(), message);
+ if (nextBehavior != null) {
+ switchBehavior(state, nextBehavior);
+ return true;
+ }
+
+ return false;
+ }
+
+ private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+ @Nullable final String followerId, long newLeaderTimeoutInMillis) {
+ LOG.debug("{}: Initiating leader transfer", persistenceId());
+
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
+ if (leadershipTransferInProgress == null) {
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+ leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
+ leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(ActorRef raftActorRef) {
+ context.setRaftActorLeadershipTransferCohort(null);
+ }
+
+ @Override
+ public void onFailure(ActorRef raftActorRef) {
+ context.setRaftActorLeadershipTransferCohort(null);
+ }
+ });
+
+ leadershipTransferInProgress.addOnComplete(onComplete);
+
+ context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
+ leadershipTransferInProgress.init();
+
+ } else {
+ LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
+ leadershipTransferInProgress.addOnComplete(onComplete);
+ }
+ }
+
+ private void onShutDown() {
+ LOG.debug("{}: onShutDown", persistenceId());
+
+ if (shuttingDown) {
+ return;
+ }
+
+ shuttingDown = true;
+
+ final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+ if (currentBehavior.state() != RaftState.Leader) {
+ // For non-leaders shutdown is a no-op
+ self().tell(PoisonPill.getInstance(), self());
+ return;
+ }
+
+ if (context.hasFollowers()) {
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(ActorRef raftActorRef) {
+ LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
+ raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+ }
+
+ @Override
+ public void onFailure(ActorRef raftActorRef) {
+ LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
+ raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+ }
+ }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
+ } else {
+ pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
+ @Override
+ protected void doRun() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+
+ @Override
+ protected void doCancel() {
+ self().tell(PoisonPill.getInstance(), self());
+ }
+ });
+ }
+ }
+
+ private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
+ LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
+ Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
+ if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()
+ && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
+ roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
+ getCurrentBehavior().getLeaderPayloadVersion()), getSelf());