+ public void transferComplete() {
+ LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
+
+ // We'll give it a little time for the new leader to be elected to give the derived class a
+ // chance to possibly complete work that was suspended while we were transferring. The
+ // RequestVote message from the new leader candidate should cause us to step down as leader
+ // and convert to follower due to higher term. We should then get an AppendEntries heart
+ // beat with the new leader id.
+
+ // Add a timer in case we don't get a leader change. Note: the Runnable is sent as a message to the raftActor
+ // which executes it safely run on the actor's thread dispatcher.
+ FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
+ newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
+ (Runnable) () -> {
+ LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+ finish(true);
+ }, raftActor.getContext().system().dispatcher(), raftActor.self());
+ }
+
+ void onNewLeader(final String newLeader) {
+ if (newLeader != null && newLeaderTimer != null) {
+ LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
+ newLeaderTimer.cancel();
+ finish(true);
+ }
+ }
+
+ private void finish(final boolean success) {
+ isTransferring = false;
+ if (transferTimer.isRunning()) {
+ transferTimer.stop();
+ if (success) {
+ LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
+ raftActor.getLeaderId(), transferTimer);
+ } else {
+ LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer);
+ raftActor.unpauseLeader();
+ }
+ }
+
+ for (OnComplete onComplete: onCompleteCallbacks) {
+ if (success) {
+ onComplete.onSuccess(raftActor.self());
+ } else {
+ onComplete.onFailure(raftActor.self());
+ }
+ }
+ }
+
+ void addOnComplete(final OnComplete onComplete) {
+ onCompleteCallbacks.add(onComplete);
+ }
+
+ boolean isTransferring() {
+ return isTransferring;
+ }
+
+ void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
+ if (newLeaderTimeoutInMillis != USE_DEFAULT_LEADER_TIMEOUT) {
+ this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+ }
+ }
+
+ public Optional<String> getRequestedFollowerId() {
+ return Optional.ofNullable(requestedFollowerId);
+ }
+
+ interface OnComplete {
+ void onSuccess(ActorRef raftActorRef);
+
+ void onFailure(ActorRef raftActorRef);
+ }