*
* @author Thomas Pantelis
*/
-public class RaftActorLeadershipTransferCohort implements Runnable {
+public class RaftActorLeadershipTransferCohort {
private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
private final RaftActor raftActor;
- private final ActorRef replyTo;
private Cancellable newLeaderTimer;
private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
private long newLeaderTimeoutInMillis = 2000;
private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+ private boolean isTransferring;
- RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
+ RaftActorLeadershipTransferCohort(RaftActor raftActor) {
this.raftActor = raftActor;
- this.replyTo = replyTo;
}
void init() {
currentBehavior.getLeaderPayloadVersion()), raftActor.self());
}
- LeaderTransitioning leaderTransitioning = new LeaderTransitioning();
for(String peerId: context.getPeerIds()) {
ActorSelection followerActor = context.getPeerActorSelection(peerId);
if(followerActor != null) {
- followerActor.tell(leaderTransitioning, context.getActor());
+ followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor());
}
}
- raftActor.pauseLeader(this);
+ raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
+ @Override
+ protected void doRun() {
+ doTransfer();
+ }
+
+ @Override
+ protected void doCancel() {
+ LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId());
+ abortTransfer();
+ }
+ });
}
/**
- * This method is invoked to run the leadership transfer.
+ * This method is invoked to perform the leadership transfer.
*/
- @Override
- public void run() {
+ @VisibleForTesting
+ void doTransfer() {
RaftActorBehavior behavior = raftActor.getCurrentBehavior();
// Sanity check...
if(behavior instanceof Leader) {
+ isTransferring = true;
((Leader)behavior).transferLeadership(this);
} else {
LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
// Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
// leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
- // safely run on actor's thread dispatcher.
+ // safely run on the actor's thread dispatcher.
FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
new Runnable() {
}
private void finish(boolean success) {
+ isTransferring = false;
if(transferTimer.isRunning()) {
transferTimer.stop();
if(success) {
LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
- raftActor.getLeaderId(), transferTimer.toString());
+ raftActor.getLeaderId(), transferTimer);
} else {
- LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
- transferTimer.toString());
+ LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer);
}
}
for(OnComplete onComplete: onCompleteCallbacks) {
if(success) {
- onComplete.onSuccess(raftActor.self(), replyTo);
+ onComplete.onSuccess(raftActor.self());
} else {
- onComplete.onFailure(raftActor.self(), replyTo);
+ onComplete.onFailure(raftActor.self());
}
}
}
onCompleteCallbacks.add(onComplete);
}
+ boolean isTransferring() {
+ return isTransferring;
+ }
+
@VisibleForTesting
void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
}
interface OnComplete {
- void onSuccess(ActorRef raftActorRef, ActorRef replyTo);
- void onFailure(ActorRef raftActorRef, ActorRef replyTo);
+ void onSuccess(ActorRef raftActorRef);
+ void onFailure(ActorRef raftActorRef);
}
}