private RaftActorServerConfigurationSupport serverConfigurationSupport;
- private RaftActorLeadershipTransferCohort leadershipTransferInProgress;
-
private boolean shuttingDown;
protected RaftActor(String id, Map<String, String> peerAddresses,
+ ". Follower is not ready to become leader")),
getSelf());
}
- }, message.getRequestedFollowerId());
+ }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
private boolean possiblyHandleBehaviorMessage(final Object message) {
return false;
}
- private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete) {
- initiateLeadershipTransfer(onComplete, null);
- }
-
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
- final String followerId) {
+ @Nullable final String followerId, long newLeaderTimeoutInMillis) {
LOG.debug("{}: Initiating leader transfer", persistenceId());
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
if (leadershipTransferInProgress == null) {
leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, followerId);
+ leadershipTransferInProgress.setNewLeaderTimeoutInMillis(newLeaderTimeoutInMillis);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
public void onSuccess(ActorRef raftActorRef) {
- leadershipTransferInProgress = null;
+ context.setRaftActorLeadershipTransferCohort(null);
}
@Override
public void onFailure(ActorRef raftActorRef) {
- leadershipTransferInProgress = null;
+ context.setRaftActorLeadershipTransferCohort(null);
}
});
leadershipTransferInProgress.addOnComplete(onComplete);
+
+ context.setRaftActorLeadershipTransferCohort(leadershipTransferInProgress);
leadershipTransferInProgress.init();
+
} else {
LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
leadershipTransferInProgress.addOnComplete(onComplete);
shuttingDown = true;
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- if (currentBehavior.state() != RaftState.Leader) {
- // For non-leaders shutdown is a no-op
- self().tell(PoisonPill.getInstance(), self());
- return;
+ switch (currentBehavior.state()) {
+ case Leader:
+ case PreLeader:
+ // Fall-through to more work
+ break;
+ default:
+ // For non-leaders shutdown is a no-op
+ self().tell(PoisonPill.getInstance(), self());
+ return;
}
if (context.hasFollowers()) {
LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
- });
+ }, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
} else {
pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), this) {
@Override
onLeaderChanged(lastValidLeaderId, currentBehavior.getLeaderId());
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress =
+ context.getRaftActorLeadershipTransferCohort();
if (leadershipTransferInProgress != null) {
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
&& !shuttingDown && !isLeadershipTransferInProgress();
}
- private boolean isLeadershipTransferInProgress() {
+ protected boolean isLeadershipTransferInProgress() {
+ RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
return leadershipTransferInProgress != null && leadershipTransferInProgress.isTransferring();
}
initializeBehavior();
}
}
- });
+ }, null, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
}