X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorLeadershipTransferCohort.java;h=45e497de46cc4bd2c29a3ffd0fd403df1f0d902c;hb=872a40f7ac1f2e14a5848329c0827f1265931f6e;hp=b087914deccf23837137ba36847d269b4bcfc7aa;hpb=a81d98f692b80c45bce3fe6a87e731abfb012a9f;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java index b087914dec..45e497de46 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorLeadershipTransferCohort.java @@ -16,6 +16,7 @@ import com.google.common.base.Stopwatch; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; @@ -53,15 +54,31 @@ import scala.concurrent.duration.FiniteDuration; public class RaftActorLeadershipTransferCohort { private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class); - private final RaftActor raftActor; - private Cancellable newLeaderTimer; + static final long USE_DEFAULT_LEADER_TIMEOUT = -1; + private final List onCompleteCallbacks = new ArrayList<>(); - private long newLeaderTimeoutInMillis = 2000; private final Stopwatch transferTimer = Stopwatch.createUnstarted(); + private final RaftActor raftActor; + private final String requestedFollowerId; + + private long newLeaderTimeoutInMillis = 2000; + private Cancellable newLeaderTimer; private boolean isTransferring; - RaftActorLeadershipTransferCohort(RaftActor raftActor) { + RaftActorLeadershipTransferCohort(final RaftActor raftActor) { + this(raftActor, null); + } + + RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) { this.raftActor = raftActor; + this.requestedFollowerId = requestedFollowerId; + + // We'll wait an election timeout period for a new leader to be elected plus some cushion to take into + // account the variance. + final long electionTimeout = raftActor.getRaftActorContext().getConfigParams() + .getElectionTimeOutInterval().toMillis(); + final int variance = raftActor.getRaftActorContext().getConfigParams().getElectionTimeVariance(); + newLeaderTimeoutInMillis = 2 * (electionTimeout + variance); } void init() { @@ -79,7 +96,7 @@ public class RaftActorLeadershipTransferCohort { for (String peerId: context.getPeerIds()) { ActorSelection followerActor = context.getPeerActorSelection(peerId); if (followerActor != null) { - followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor()); + followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor()); } } @@ -133,9 +150,8 @@ public class RaftActorLeadershipTransferCohort { // and convert to follower due to higher term. We should then get an AppendEntries heart // beat with the new leader id. - // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new - // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it - // safely run on the actor's thread dispatcher. + // Add a timer in case we don't get a leader change. Note: the Runnable is sent as a message to the raftActor + // which executes it safely run on the actor's thread dispatcher. FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS); newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(), (Runnable) () -> { @@ -144,7 +160,7 @@ public class RaftActorLeadershipTransferCohort { }, raftActor.getContext().system().dispatcher(), raftActor.self()); } - void onNewLeader(String newLeader) { + void onNewLeader(final String newLeader) { if (newLeader != null && newLeaderTimer != null) { LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader); newLeaderTimer.cancel(); @@ -152,7 +168,7 @@ public class RaftActorLeadershipTransferCohort { } } - private void finish(boolean success) { + private void finish(final boolean success) { isTransferring = false; if (transferTimer.isRunning()) { transferTimer.stop(); @@ -173,7 +189,7 @@ public class RaftActorLeadershipTransferCohort { } } - void addOnComplete(OnComplete onComplete) { + void addOnComplete(final OnComplete onComplete) { onCompleteCallbacks.add(onComplete); } @@ -181,9 +197,14 @@ public class RaftActorLeadershipTransferCohort { return isTransferring; } - @VisibleForTesting - void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) { - this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis; + void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) { + if (newLeaderTimeoutInMillis != USE_DEFAULT_LEADER_TIMEOUT) { + this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis; + } + } + + public Optional getRequestedFollowerId() { + return Optional.fromNullable(requestedFollowerId); } interface OnComplete {