/**
* A raft actor support class that participates in leadership transfer. An instance is created upon
* initialization of leadership transfer.
- * <p>
+ * <p/>
* The transfer process is as follows:
* <ol>
* <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
* possibly complete work that was suspended while we were transferring.</li>
* <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
* </ol>
- * <p>
+ * <p/>
* NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
* internal state.
*
transferTimer.start();
Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
- if(roleChangeNotifier.isPresent()) {
+ if (roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
currentBehavior.getLeaderPayloadVersion()), raftActor.self());
}
- for(String peerId: context.getPeerIds()) {
+ for (String peerId: context.getPeerIds()) {
ActorSelection followerActor = context.getPeerActorSelection(peerId);
- if(followerActor != null) {
+ if (followerActor != null) {
followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor());
}
}
void doTransfer() {
RaftActorBehavior behavior = raftActor.getCurrentBehavior();
// Sanity check...
- if(behavior instanceof Leader) {
+ if (behavior instanceof Leader) {
isTransferring = true;
((Leader)behavior).transferLeadership(this);
} else {
// safely run on the actor's thread dispatcher.
FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
- (Runnable) () -> {
- LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
- finish(true);
- }, raftActor.getContext().system().dispatcher(), raftActor.self());
+ (Runnable) () -> {
+ LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+ finish(true);
+ }, raftActor.getContext().system().dispatcher(), raftActor.self());
}
void onNewLeader(String newLeader) {
- if(newLeader != null && newLeaderTimer != null) {
+ if (newLeader != null && newLeaderTimer != null) {
LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
newLeaderTimer.cancel();
finish(true);
private void finish(boolean success) {
isTransferring = false;
- if(transferTimer.isRunning()) {
+ if (transferTimer.isRunning()) {
transferTimer.stop();
- if(success) {
+ if (success) {
LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
raftActor.getLeaderId(), transferTimer);
} else {
}
}
- for(OnComplete onComplete: onCompleteCallbacks) {
- if(success) {
+ for (OnComplete onComplete: onCompleteCallbacks) {
+ if (success) {
onComplete.onSuccess(raftActor.self());
} else {
onComplete.onFailure(raftActor.self());
interface OnComplete {
void onSuccess(ActorRef raftActorRef);
+
void onFailure(ActorRef raftActorRef);
}
}