import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
/**
* A raft actor support class that participates in leadership transfer. An instance is created upon
* initialization of leadership transfer.
- * <p/>
+ *
+ * <p>
* The transfer process is as follows:
* <ol>
* <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
* their local RoleChangeNotifiers.</li>
* <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
* instance. This allows derived classes to perform work prior to transferring leadership.</li>
- * <li>When the pause is complete, the {@link #run} method is called which in turn calls
- * {@link Leader#transferLeadership}.</li>
+ * <li>When the pause is complete, the run method is called which in turn calls
+ * {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.</li>
* <li>The Leader calls {@link #transferComplete} on successful completion.</li>
* <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
* possibly complete work that was suspended while we were transferring.</li>
* <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
* </ol>
- * <p/>
+ *
+ * <p>
* NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
* internal state.
*
public class RaftActorLeadershipTransferCohort {
private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
- private final RaftActor raftActor;
- private Cancellable newLeaderTimer;
private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
- private long newLeaderTimeoutInMillis = 2000;
private final Stopwatch transferTimer = Stopwatch.createUnstarted();
+ private final RaftActor raftActor;
+ private final String requestedFollowerId;
+
+ private long newLeaderTimeoutInMillis = 2000;
+ private Cancellable newLeaderTimer;
private boolean isTransferring;
- RaftActorLeadershipTransferCohort(RaftActor raftActor) {
+ RaftActorLeadershipTransferCohort(final RaftActor raftActor) {
+ this(raftActor, null);
+ }
+
+ RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) {
this.raftActor = raftActor;
+ this.requestedFollowerId = requestedFollowerId;
}
void init() {
for (String peerId: context.getPeerIds()) {
ActorSelection followerActor = context.getPeerActorSelection(peerId);
if (followerActor != null) {
- followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor());
+ followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
}
}
}, raftActor.getContext().system().dispatcher(), raftActor.self());
}
- void onNewLeader(String newLeader) {
+ void onNewLeader(final String newLeader) {
if (newLeader != null && newLeaderTimer != null) {
LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
newLeaderTimer.cancel();
}
}
- private void finish(boolean success) {
+ private void finish(final boolean success) {
isTransferring = false;
if (transferTimer.isRunning()) {
transferTimer.stop();
}
}
- void addOnComplete(OnComplete onComplete) {
+ void addOnComplete(final OnComplete onComplete) {
onCompleteCallbacks.add(onComplete);
}
}
@VisibleForTesting
- void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
+ void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
}
+ public Optional<String> getRequestedFollowerId() {
+ return Optional.fromNullable(requestedFollowerId);
+ }
+
interface OnComplete {
void onSuccess(ActorRef raftActorRef);