- } else if (message instanceof FindLeader) {
- getSender().tell(
- new FindLeaderReply(getLeaderAddress()),
- getSelf()
- );
- } else if(message instanceof GetOnDemandRaftState) {
- onGetOnDemandRaftStats();
- } else if(!snapshotSupport.handleSnapshotMessage(message)) {
- reusableBehaviorStateHolder.init(getCurrentBehavior());
+ return false;
+ }
+
+ private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
+ final @Nullable String followerId, final long newLeaderTimeoutInMillis) {
+ LOG.debug("{}: Initiating leader transfer", persistenceId());
+
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
+ if (leadershipTransferInProgress == null) {
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+ leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
+ leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(final ActorRef raftActorRef) {
+ context.setRaftActorLeadershipTransferCohort(null);
+ }
+
+ @Override
+ public void onFailure(final ActorRef raftActorRef) {
+ context.setRaftActorLeadershipTransferCohort(null);
+ }
+ });
+
+ leadershipTransferInProgress.addOnComplete(onComplete);
+
+ context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
+ leadershipTransferInProgress.init();
+
+ } else {
+ LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
+ leadershipTransferInProgress.addOnComplete(onComplete);
+ }
+ }
+
+ private void onShutDown() {
+ LOG.debug("{}: onShutDown", persistenceId());
+
+ if (shuttingDown) {
+ return;
+ }
+
+ shuttingDown = true;
+
+ final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
+ switch (currentBehavior.state()) {
+ case Leader:
+ case PreLeader:
+ // Fall-through to more work
+ break;
+ default:
+ // For non-leaders shutdown is a no-op
+ self().tell(PoisonPill.getInstance(), self());
+ return;
+ }
+
+ if (context.hasFollowers()) {
+ initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
+ @Override
+ public void onSuccess(final ActorRef raftActorRef) {
+ LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
+ raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
+ }