Fix javadocs and enable doclint
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorLeadershipTransferCohort.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Optional;
15 import com.google.common.base.Stopwatch;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
20 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
21 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24 import scala.concurrent.duration.FiniteDuration;
25
26 /**
27  * A raft actor support class that participates in leadership transfer. An instance is created upon
28  * initialization of leadership transfer.
29  *
30  * <p>
31  * The transfer process is as follows:
32  * <ol>
33  * <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
34  *     clients that we no longer have a working leader.</li>
35  * <li>Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to
36  *     their local RoleChangeNotifiers.</li>
37  * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
38  *     instance. This allows derived classes to perform work prior to transferring leadership.</li>
39  * <li>When the pause is complete, the run method is called which in turn calls
40  *     {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.</li>
41  * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
42  * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
43  *     possibly complete work that was suspended while we were transferring.</li>
44  * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
45  * </ol>
46  *
47  * <p>
48  * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
49  * internal state.
50  *
51  * @author Thomas Pantelis
52  */
53 public class RaftActorLeadershipTransferCohort {
54     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
55
56     private final RaftActor raftActor;
57     private Cancellable newLeaderTimer;
58     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
59     private long newLeaderTimeoutInMillis = 2000;
60     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
61     private boolean isTransferring;
62
63     RaftActorLeadershipTransferCohort(RaftActor raftActor) {
64         this.raftActor = raftActor;
65     }
66
67     void init() {
68         RaftActorContext context = raftActor.getRaftActorContext();
69         RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior();
70
71         transferTimer.start();
72
73         Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
74         if (roleChangeNotifier.isPresent()) {
75             roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
76                     currentBehavior.getLeaderPayloadVersion()), raftActor.self());
77         }
78
79         for (String peerId: context.getPeerIds()) {
80             ActorSelection followerActor = context.getPeerActorSelection(peerId);
81             if (followerActor != null) {
82                 followerActor.tell(LeaderTransitioning.INSTANCE, context.getActor());
83             }
84         }
85
86         raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
87             @Override
88             protected void doRun() {
89                 doTransfer();
90             }
91
92             @Override
93             protected void doCancel() {
94                 LOG.debug("{}: pauseLeader timed out - aborting transfer", raftActor.persistenceId());
95                 abortTransfer();
96             }
97         });
98     }
99
100     /**
101      * This method is invoked to perform the leadership transfer.
102      */
103     @VisibleForTesting
104     void doTransfer() {
105         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
106         // Sanity check...
107         if (behavior instanceof Leader) {
108             isTransferring = true;
109             ((Leader)behavior).transferLeadership(this);
110         } else {
111             LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
112             finish(true);
113         }
114     }
115
116     /**
117      * This method is invoked to abort leadership transfer on failure.
118      */
119     public void abortTransfer() {
120         LOG.debug("{}: leader transfer aborted", raftActor.persistenceId());
121         finish(false);
122     }
123
124     /**
125      * This method is invoked when leadership transfer was carried out and complete.
126      */
127     public void transferComplete() {
128         LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
129
130         // We'll give it a little time for the new leader to be elected to give the derived class a
131         // chance to possibly complete work that was suspended while we were transferring. The
132         // RequestVote message from the new leader candidate should cause us to step down as leader
133         // and convert to follower due to higher term. We should then get an AppendEntries heart
134         // beat with the new leader id.
135
136         // Add a timer in case we don't get a leader change - 2 sec should be plenty of time if a new
137         // leader is elected. Note: the Runnable is sent as a message to the raftActor which executes it
138         // safely run on the actor's thread dispatcher.
139         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
140         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
141             (Runnable) () -> {
142                 LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
143                 finish(true);
144             }, raftActor.getContext().system().dispatcher(), raftActor.self());
145     }
146
147     void onNewLeader(String newLeader) {
148         if (newLeader != null && newLeaderTimer != null) {
149             LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
150             newLeaderTimer.cancel();
151             finish(true);
152         }
153     }
154
155     private void finish(boolean success) {
156         isTransferring = false;
157         if (transferTimer.isRunning()) {
158             transferTimer.stop();
159             if (success) {
160                 LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
161                         raftActor.getLeaderId(), transferTimer);
162             } else {
163                 LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer);
164             }
165         }
166
167         for (OnComplete onComplete: onCompleteCallbacks) {
168             if (success) {
169                 onComplete.onSuccess(raftActor.self());
170             } else {
171                 onComplete.onFailure(raftActor.self());
172             }
173         }
174     }
175
176     void addOnComplete(OnComplete onComplete) {
177         onCompleteCallbacks.add(onComplete);
178     }
179
180     boolean isTransferring() {
181         return isTransferring;
182     }
183
184     @VisibleForTesting
185     void setNewLeaderTimeoutInMillis(long newLeaderTimeoutInMillis) {
186         this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
187     }
188
189     interface OnComplete {
190         void onSuccess(ActorRef raftActorRef);
191
192         void onFailure(ActorRef raftActorRef);
193     }
194 }