7105714b0ba8e807fd7cc137b928a3ad0a875539
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorLeadershipTransferCohort.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Optional;
15 import com.google.common.base.Stopwatch;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
20 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
21 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24 import scala.concurrent.duration.FiniteDuration;
25
26 /**
27  * A raft actor support class that participates in leadership transfer. An instance is created upon
28  * initialization of leadership transfer.
29  * <p>
30  * The transfer process is as follows:
31  * <ol>
32  * <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
33  *     clients that we no longer have a working leader.</li>
34  * <li>Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to
35  *     their local RoleChangeNotifiers.</li>
36  * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
37  *     instance. This allows derived classes to perform work prior to transferring leadership.</li>
38  * <li>When the pause is complete, the {@link #run} method is called which in turn calls
39  *     {@link Leader#transferLeadership}.</li>
40  * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
41  * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
42  *     possibly complete work that was suspended while we were transferring.</li>
43  * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
44  * </ol>
45  * <p>
46  * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
47  * internal state.
48  *
49  * @author Thomas Pantelis
50  */
51 public class RaftActorLeadershipTransferCohort implements Runnable {
52     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
53
54     private final RaftActor raftActor;
55     private final ActorRef replyTo;
56     private Cancellable newLeaderTimer;
57     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
58     private long newLeaderTimeoutInMillis = 2000;
59     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
60     private boolean isTransferring;
61
62     RaftActorLeadershipTransferCohort(RaftActor raftActor, ActorRef replyTo) {
63         this.raftActor = raftActor;
64         this.replyTo = replyTo;
65     }
66
67     void init() {
68         RaftActorContext context = raftActor.getRaftActorContext();
69         RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior();
70
71         transferTimer.start();
72
73         Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
74         if(roleChangeNotifier.isPresent()) {
75             roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
76                     currentBehavior.getLeaderPayloadVersion()), raftActor.self());
77         }
78
79         LeaderTransitioning leaderTransitioning = new LeaderTransitioning();
80         for(String peerId: context.getPeerIds()) {
81             ActorSelection followerActor = context.getPeerActorSelection(peerId);
82             if(followerActor != null) {
83                 followerActor.tell(leaderTransitioning, context.getActor());
84             }
85         }
86
87         raftActor.pauseLeader(this);
88     }
89
90     /**
91      * This method is invoked to run the leadership transfer.
92      */
93     @Override
94     public void run() {
95         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
96         // Sanity check...
97         if(behavior instanceof Leader) {
98             isTransferring = true;
99             ((Leader)behavior).transferLeadership(this);
100         } else {
101             LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
102             finish(true);
103         }
104     }
105
106     /**
107      * This method is invoked to abort leadership transfer on failure.
108      */
109     public void abortTransfer() {
110         LOG.debug("{}: leader transfer aborted", raftActor.persistenceId());
111         finish(false);
112     }
113
114     /**
115      * This method is invoked when leadership transfer was carried out and complete.
116      */
117     public void transferComplete() {
118         LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
119
120         // We'll give it a little time for the new leader to be elected to give the derived class a
121         // chance to possibly complete work that was suspended while we were transferring. The
122         // RequestVote message from the new leader candidate should cause us to step down as leader
123         // and convert to follower due to higher term. We should then get an AppendEntries heart
124         // beat with the new leader id.
125
126         // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
127         // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
128         // safely run on actor's thread dispatcher.
129         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
130         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
131                 new Runnable() {
132                     @Override
133                     public void run() {
134                         LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
135                         finish(true);
136                     }
137                 }, raftActor.getContext().system().dispatcher(), raftActor.self());
138     }
139
140     void onNewLeader(String newLeader) {
141         if(newLeader != null && newLeaderTimer != null) {
142             LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
143             newLeaderTimer.cancel();
144             finish(true);
145         }
146     }
147
148     private void finish(boolean success) {
149         isTransferring = false;
150         if(transferTimer.isRunning()) {
151             transferTimer.stop();
152             if(success) {
153                 LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
154                         raftActor.getLeaderId(), transferTimer.toString());
155             } else {
156                 LOG.info("{}: Failed to transfer leadership in {}", raftActor.persistenceId(),
157                         transferTimer.toString());
158             }
159         }
160
161         for(OnComplete onComplete: onCompleteCallbacks) {
162             if(success) {
163                 onComplete.onSuccess(raftActor.self(), replyTo);
164             } else {
165                 onComplete.onFailure(raftActor.self(), replyTo);
166             }
167         }
168     }
169
170     void addOnComplete(OnComplete onComplete) {
171         onCompleteCallbacks.add(onComplete);
172     }
173
174     boolean isTransferring() {
175         return isTransferring;
176     }
177
178     @VisibleForTesting
179     void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
180         this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
181     }
182
183     interface OnComplete {
184         void onSuccess(ActorRef raftActorRef, ActorRef replyTo);
185         void onFailure(ActorRef raftActorRef, ActorRef replyTo);
186     }
187 }