2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Optional;
15 import com.google.common.base.Stopwatch;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import org.eclipse.jdt.annotation.Nullable;
20 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
21 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
22 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.duration.FiniteDuration;
28 * A raft actor support class that participates in leadership transfer. An instance is created upon
29 * initialization of leadership transfer.
32 * The transfer process is as follows:
34 * <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
35 * clients that we no longer have a working leader.</li>
36 * <li>Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to
37 * their local RoleChangeNotifiers.</li>
38 * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
39 * instance. This allows derived classes to perform work prior to transferring leadership.</li>
40 * <li>When the pause is complete, the run method is called which in turn calls
41 * {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.</li>
42 * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
43 * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
44 * possibly complete work that was suspended while we were transferring.</li>
45 * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
49 * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
52 * @author Thomas Pantelis
54 public class RaftActorLeadershipTransferCohort {
55 private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
57 static final long USE_DEFAULT_LEADER_TIMEOUT = -1;
59 private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
60 private final Stopwatch transferTimer = Stopwatch.createUnstarted();
61 private final RaftActor raftActor;
62 private final String requestedFollowerId;
64 private long newLeaderTimeoutInMillis = 2000;
65 private Cancellable newLeaderTimer;
66 private boolean isTransferring;
68 RaftActorLeadershipTransferCohort(final RaftActor raftActor) {
69 this(raftActor, null);
72 RaftActorLeadershipTransferCohort(final RaftActor raftActor, final @Nullable String requestedFollowerId) {
73 this.raftActor = raftActor;
74 this.requestedFollowerId = requestedFollowerId;
76 // We'll wait an election timeout period for a new leader to be elected plus some cushion to take into
77 // account the variance.
78 final long electionTimeout = raftActor.getRaftActorContext().getConfigParams()
79 .getElectionTimeOutInterval().toMillis();
80 final int variance = raftActor.getRaftActorContext().getConfigParams().getElectionTimeVariance();
81 newLeaderTimeoutInMillis = 2 * (electionTimeout + variance);
85 RaftActorContext context = raftActor.getRaftActorContext();
86 RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior();
88 transferTimer.start();
90 Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
91 if (roleChangeNotifier.isPresent()) {
92 roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
93 currentBehavior.getLeaderPayloadVersion()), raftActor.self());
96 for (String peerId: context.getPeerIds()) {
97 ActorSelection followerActor = context.getPeerActorSelection(peerId);
98 if (followerActor != null) {
99 followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
103 raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
105 protected void doRun() {
106 LOG.debug("{}: pauseLeader successfully completed - doing transfer", raftActor.persistenceId());
111 protected void doCancel() {
112 LOG.debug("{}: pauseLeader timed out - continuing with transfer", raftActor.persistenceId());
119 * This method is invoked to perform the leadership transfer.
123 RaftActorBehavior behavior = raftActor.getCurrentBehavior();
125 if (behavior instanceof Leader) {
126 isTransferring = true;
127 ((Leader)behavior).transferLeadership(this);
129 LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
135 * This method is invoked to abort leadership transfer on failure.
137 public void abortTransfer() {
138 LOG.debug("{}: leader transfer aborted", raftActor.persistenceId());
143 * This method is invoked when leadership transfer was carried out and complete.
145 public void transferComplete() {
146 LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
148 // We'll give it a little time for the new leader to be elected to give the derived class a
149 // chance to possibly complete work that was suspended while we were transferring. The
150 // RequestVote message from the new leader candidate should cause us to step down as leader
151 // and convert to follower due to higher term. We should then get an AppendEntries heart
152 // beat with the new leader id.
154 // Add a timer in case we don't get a leader change. Note: the Runnable is sent as a message to the raftActor
155 // which executes it safely run on the actor's thread dispatcher.
156 FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
157 newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
159 LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
161 }, raftActor.getContext().system().dispatcher(), raftActor.self());
164 void onNewLeader(final String newLeader) {
165 if (newLeader != null && newLeaderTimer != null) {
166 LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
167 newLeaderTimer.cancel();
172 private void finish(final boolean success) {
173 isTransferring = false;
174 if (transferTimer.isRunning()) {
175 transferTimer.stop();
177 LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
178 raftActor.getLeaderId(), transferTimer);
180 LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer);
181 raftActor.unpauseLeader();
185 for (OnComplete onComplete: onCompleteCallbacks) {
187 onComplete.onSuccess(raftActor.self());
189 onComplete.onFailure(raftActor.self());
194 void addOnComplete(final OnComplete onComplete) {
195 onCompleteCallbacks.add(onComplete);
198 boolean isTransferring() {
199 return isTransferring;
202 void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
203 if (newLeaderTimeoutInMillis != USE_DEFAULT_LEADER_TIMEOUT) {
204 this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
208 public Optional<String> getRequestedFollowerId() {
209 return Optional.fromNullable(requestedFollowerId);
212 interface OnComplete {
213 void onSuccess(ActorRef raftActorRef);
215 void onFailure(ActorRef raftActorRef);