LOG.debug("{}: Initiating leader transfer", persistenceId());
if(leadershipTransferInProgress == null) {
- leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this, getSender());
+ leadershipTransferInProgress = new RaftActorLeadershipTransferCohort(this);
leadershipTransferInProgress.addOnComplete(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onSuccess(ActorRef raftActorRef) {
leadershipTransferInProgress = null;
}
@Override
- public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onFailure(ActorRef raftActorRef) {
leadershipTransferInProgress = null;
}
});
if (context.hasFollowers()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onSuccess(ActorRef raftActorRef) {
LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
@Override
- public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onFailure(ActorRef raftActorRef) {
LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
if (isLeader()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
- public void onSuccess(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onSuccess(ActorRef raftActorRef) {
LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
ensureFollowerState();
}
@Override
- public void onFailure(ActorRef raftActorRef, ActorRef replyTo) {
+ public void onFailure(ActorRef raftActorRef) {
LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
ensureFollowerState();
}
private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
private final RaftActor raftActor;
- private final ActorRef replyTo;
private Cancellable newLeaderTimer;
private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
private long newLeaderTimeoutInMillis = 2000;
private final Stopwatch transferTimer = Stopwatch.createUnstarted();
private boolean isTransferring;
- RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
+ RaftActorLeadershipTransferCohort(RaftActor raftActor) {
this.raftActor = raftActor;
- this.replyTo = replyTo;
}
void init() {
for(OnComplete onComplete: onCompleteCallbacks) {
if(success) {
- onComplete.onSuccess(raftActor.self(), replyTo);
+ onComplete.onSuccess(raftActor.self());
} else {
- onComplete.onFailure(raftActor.self(), replyTo);
+ onComplete.onFailure(raftActor.self());
}
}
}
}
interface OnComplete {
- void onSuccess(ActorRef raftActorRef, ActorRef replyTo);
- void onFailure(ActorRef raftActorRef, ActorRef replyTo);
+ void onSuccess(ActorRef raftActorRef);
+ void onFailure(ActorRef raftActorRef);
}
}
/**
* Message sent to a raft actor to shutdown gracefully. If it's the leader it will transfer leadership to a
- * follower. As its last act, the actor self-destructs via a PoisonPill.
+ * follower. As its last act, the actor self-destructs via a PoisonPill. This message should only be used with
+ * Patterns.gracefulStop().
*
* @author Thomas Pantelis
*/
mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.builder().id(persistenceId).config(
config).pauseLeaderFunction(pauseLeaderFunction).props().withDispatcher(Dispatchers.DefaultDispatcherId()),
persistenceId).underlyingActor();
- cohort = new RaftActorLeadershipTransferCohort(mockRaftActor, null);
+ cohort = new RaftActorLeadershipTransferCohort(mockRaftActor);
cohort.addOnComplete(onComplete);
mockRaftActor.waitForInitializeBehaviorComplete();
}
cohort.setNewLeaderTimeoutInMillis(20000);
cohort.onNewLeader("new-leader");
- verify(onComplete, never()).onSuccess(mockRaftActor.self(), null);
+ verify(onComplete, never()).onSuccess(mockRaftActor.self());
cohort.transferComplete();
cohort.onNewLeader(null);
- verify(onComplete, never()).onSuccess(mockRaftActor.self(), null);
+ verify(onComplete, never()).onSuccess(mockRaftActor.self());
cohort.onNewLeader("new-leader");
- verify(onComplete).onSuccess(mockRaftActor.self(), null);
+ verify(onComplete).onSuccess(mockRaftActor.self());
}
@Test
setup("testNewLeaderTimeout");
cohort.setNewLeaderTimeoutInMillis(200);
cohort.transferComplete();
- verify(onComplete, timeout(3000)).onSuccess(mockRaftActor.self(), null);
+ verify(onComplete, timeout(3000)).onSuccess(mockRaftActor.self());
}
@Test
config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
setup("testNotLeaderOnDoTransfer");
cohort.doTransfer();
- verify(onComplete).onSuccess(mockRaftActor.self(), null);
+ verify(onComplete).onSuccess(mockRaftActor.self());
}
@Test
public void testAbortTransfer() {
setup("testAbortTransfer");
cohort.abortTransfer();
- verify(onComplete).onFailure(mockRaftActor.self(), null);
+ verify(onComplete).onFailure(mockRaftActor.self());
}
@Test
setup("testPauseLeaderTimeout");
cohort.init();
- verify(onComplete, timeout(2000)).onFailure(mockRaftActor.self(), null);
+ verify(onComplete, timeout(2000)).onFailure(mockRaftActor.self());
}
}