+ public void transferComplete() {
+ LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
+
+ // We'll give it a little time for the new leader to be elected to give the derived class a
+ // chance to possibly complete work that was suspended while we were transferring. The
+ // RequestVote message from the new leader candidate should cause us to step down as leader
+ // and convert to follower due to higher term. We should then get an AppendEntries heart
+ // beat with the new leader id.
+
+ // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
+ // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
+ // safely run on the actor's thread dispatcher.
+ FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
+ newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
+ new Runnable() {
+ @Override
+ public void run() {
+ LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+ finish(true);
+ }
+ }, raftActor.getContext().system().dispatcher(), raftActor.self());
+ }
+
+ void onNewLeader(String newLeader) {
+ if(newLeader != null && newLeaderTimer != null) {
+ LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
+ newLeaderTimer.cancel();
+ finish(true);
+ }
+ }
+
+ private void finish(boolean success) {
+ isTransferring = false;
+ if(transferTimer.isRunning()) {
+ transferTimer.stop();
+ if(success) {
+ LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
+ raftActor.getLeaderId(), transferTimer.toString());
+ } else {
+ LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
+ transferTimer.toString());
+ }
+ }
+
+ for(OnComplete onComplete: onCompleteCallbacks) {
+ if(success) {
+ onComplete.onSuccess(raftActor.self(), replyTo);
+ } else {
+ onComplete.onFailure(raftActor.self(), replyTo);
+ }
+ }
+ }
+
+ void addOnComplete(OnComplete onComplete) {
+ onCompleteCallbacks.add(onComplete);
+ }
+
+ boolean isTransferring() {
+ return isTransferring;
+ }
+
+ @VisibleForTesting
+ void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
+ this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
+ }
+
+ interface OnComplete {
+ void onSuccess(ActorRef raftActorRef, ActorRef replyTo);
+ void onFailure(ActorRef raftActorRef, ActorRef replyTo);
+ }