2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Optional;
15 import com.google.common.base.Stopwatch;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
20 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
21 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24 import scala.concurrent.duration.FiniteDuration;
27 * A raft actor support class that participates in leadership transfer. An instance is created upon
28 * initialization of leadership transfer.
30 * The transfer process is as follows:
32 * <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
33 * clients that we no longer have a working leader.</li>
34 * <li>Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to
35 * their local RoleChangeNotifiers.</li>
36 * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
37 * instance. This allows derived classes to perform work prior to transferring leadership.</li>
38 * <li>When the pause is complete, the {@link #run} method is called which in turn calls
39 * {@link Leader#transferLeadership}.</li>
40 * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
41 * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
42 * possibly complete work that was suspended while we were transferring.</li>
43 * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
46 * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
49 * @author Thomas Pantelis
51 public class RaftActorLeadershipTransferCohort implements Runnable {
52 private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
54 private final RaftActor raftActor;
55 private final ActorRef replyTo;
56 private Cancellable newLeaderTimer;
57 private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
58 private long newLeaderTimeoutInMillis = 2000;
59 private final Stopwatch transferTimer = Stopwatch.createUnstarted();
60 private boolean isTransferring;
62 RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
63 this.raftActor = raftActor;
64 this.replyTo = replyTo;
68 RaftActorContext context = raftActor.getRaftActorContext();
69 RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior();
71 transferTimer.start();
73 Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
74 if(roleChangeNotifier.isPresent()) {
75 roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
76 currentBehavior.getLeaderPayloadVersion()), raftActor.self());
79 LeaderTransitioning leaderTransitioning = new LeaderTransitioning();
80 for(String peerId: context.getPeerIds()) {
81 ActorSelection followerActor = context.getPeerActorSelection(peerId);
82 if(followerActor != null) {
83 followerActor.tell(leaderTransitioning, context.getActor());
87 raftActor.pauseLeader(this);
91 * This method is invoked to run the leadership transfer.
95 RaftActorBehavior behavior = raftActor.getCurrentBehavior();
97 if(behavior instanceof Leader) {
98 isTransferring = true;
99 ((Leader)behavior).transferLeadership(this);
101 LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
107 * This method is invoked to abort leadership transfer on failure.
109 public void abortTransfer() {
110 LOG.debug("{}: leader transfer aborted", raftActor.persistenceId());
115 * This method is invoked when leadership transfer was carried out and complete.
117 public void transferComplete() {
118 LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
120 // We'll give it a little time for the new leader to be elected to give the derived class a
121 // chance to possibly complete work that was suspended while we were transferring. The
122 // RequestVote message from the new leader candidate should cause us to step down as leader
123 // and convert to follower due to higher term. We should then get an AppendEntries heart
124 // beat with the new leader id.
126 // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
127 // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
128 // safely run on actor's thread dispatcher.
129 FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
130 newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
134 LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
137 }, raftActor.getContext().system().dispatcher(), raftActor.self());
140 void onNewLeader(String newLeader) {
141 if(newLeader != null && newLeaderTimer != null) {
142 LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
143 newLeaderTimer.cancel();
148 private void finish(boolean success) {
149 isTransferring = false;
150 if(transferTimer.isRunning()) {
151 transferTimer.stop();
153 LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
154 raftActor.getLeaderId(), transferTimer.toString());
156 LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
157 transferTimer.toString());
161 for(OnComplete onComplete: onCompleteCallbacks) {
163 onComplete.onSuccess(raftActor.self(), replyTo);
165 onComplete.onFailure(raftActor.self(), replyTo);
170 void addOnComplete(OnComplete onComplete) {
171 onCompleteCallbacks.add(onComplete);
174 boolean isTransferring() {
175 return isTransferring;
179 void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
180 this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
183 interface OnComplete {
184 void onSuccess(ActorRef raftActorRef, ActorRef replyTo);
185 void onFailure(ActorRef raftActorRef, ActorRef replyTo);