BUG-8618: introduce RaftActor.unpauseLeader()
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorLeadershipTransferCohort.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Cancellable;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Optional;
15 import com.google.common.base.Stopwatch;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.Nullable;
20 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
21 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
22 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.duration.FiniteDuration;
26
27 /**
28  * A raft actor support class that participates in leadership transfer. An instance is created upon
29  * initialization of leadership transfer.
30  *
31  * <p>
32  * The transfer process is as follows:
33  * <ol>
34  * <li>Send a LeaderStateChanged message with a null leader Id to the local RoleChangeNotifier to notify
35  *     clients that we no longer have a working leader.</li>
36  * <li>Send a LeaderTransitioning message to each follower so each can send LeaderStateChanged messages to
37  *     their local RoleChangeNotifiers.</li>
38  * <li>Call {@link RaftActor#pauseLeader} passing this RaftActorLeadershipTransferCohort
39  *     instance. This allows derived classes to perform work prior to transferring leadership.</li>
40  * <li>When the pause is complete, the run method is called which in turn calls
41  *     {@link Leader#transferLeadership(RaftActorLeadershipTransferCohort)}.</li>
42  * <li>The Leader calls {@link #transferComplete} on successful completion.</li>
43  * <li>Wait a short period of time for the new leader to be elected to give the derived class a chance to
44  *     possibly complete work that was suspended while we were transferring.</li>
45  * <li>On notification of the new leader from the RaftActor or on time out, notify {@link OnComplete} callbacks.</li>
46  * </ol>
47  *
48  * <p>
49  * NOTE: All methods on this class must be called on the actor's thread dispatcher as they may access/modify
50  * internal state.
51  *
52  * @author Thomas Pantelis
53  */
54 public class RaftActorLeadershipTransferCohort {
55     private static final Logger LOG = LoggerFactory.getLogger(RaftActorLeadershipTransferCohort.class);
56
57     static final long USE_DEFAULT_LEADER_TIMEOUT = -1;
58
59     private final List<OnComplete> onCompleteCallbacks = new ArrayList<>();
60     private final Stopwatch transferTimer = Stopwatch.createUnstarted();
61     private final RaftActor raftActor;
62     private final String requestedFollowerId;
63
64     private long newLeaderTimeoutInMillis = 2000;
65     private Cancellable newLeaderTimer;
66     private boolean isTransferring;
67
68     RaftActorLeadershipTransferCohort(final RaftActor raftActor) {
69         this(raftActor, null);
70     }
71
72     RaftActorLeadershipTransferCohort(final RaftActor raftActor, @Nullable final String requestedFollowerId) {
73         this.raftActor = raftActor;
74         this.requestedFollowerId = requestedFollowerId;
75
76         // We'll wait an election timeout period for a new leader to be elected plus some cushion to take into
77         // account the variance.
78         final long electionTimeout = raftActor.getRaftActorContext().getConfigParams()
79                 .getElectionTimeOutInterval().toMillis();
80         final int variance = raftActor.getRaftActorContext().getConfigParams().getElectionTimeVariance();
81         newLeaderTimeoutInMillis = 2 * (electionTimeout + variance);
82     }
83
84     void init() {
85         RaftActorContext context = raftActor.getRaftActorContext();
86         RaftActorBehavior currentBehavior = raftActor.getCurrentBehavior();
87
88         transferTimer.start();
89
90         Optional<ActorRef> roleChangeNotifier = raftActor.getRoleChangeNotifier();
91         if (roleChangeNotifier.isPresent()) {
92             roleChangeNotifier.get().tell(raftActor.newLeaderStateChanged(context.getId(), null,
93                     currentBehavior.getLeaderPayloadVersion()), raftActor.self());
94         }
95
96         for (String peerId: context.getPeerIds()) {
97             ActorSelection followerActor = context.getPeerActorSelection(peerId);
98             if (followerActor != null) {
99                 followerActor.tell(new LeaderTransitioning(context.getId()), context.getActor());
100             }
101         }
102
103         raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
104             @Override
105             protected void doRun() {
106                 LOG.debug("{}: pauseLeader successfully completed - doing transfer", raftActor.persistenceId());
107                 doTransfer();
108             }
109
110             @Override
111             protected void doCancel() {
112                 LOG.debug("{}: pauseLeader timed out - continuing with transfer", raftActor.persistenceId());
113                 doTransfer();
114             }
115         });
116     }
117
118     /**
119      * This method is invoked to perform the leadership transfer.
120      */
121     @VisibleForTesting
122     void doTransfer() {
123         RaftActorBehavior behavior = raftActor.getCurrentBehavior();
124         // Sanity check...
125         if (behavior instanceof Leader) {
126             isTransferring = true;
127             ((Leader)behavior).transferLeadership(this);
128         } else {
129             LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
130             finish(true);
131         }
132     }
133
134     /**
135      * This method is invoked to abort leadership transfer on failure.
136      */
137     public void abortTransfer() {
138         LOG.debug("{}: leader transfer aborted", raftActor.persistenceId());
139         finish(false);
140     }
141
142     /**
143      * This method is invoked when leadership transfer was carried out and complete.
144      */
145     public void transferComplete() {
146         LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
147
148         // We'll give it a little time for the new leader to be elected to give the derived class a
149         // chance to possibly complete work that was suspended while we were transferring. The
150         // RequestVote message from the new leader candidate should cause us to step down as leader
151         // and convert to follower due to higher term. We should then get an AppendEntries heart
152         // beat with the new leader id.
153
154         // Add a timer in case we don't get a leader change. Note: the Runnable is sent as a message to the raftActor
155         // which executes it safely run on the actor's thread dispatcher.
156         FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
157         newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
158             (Runnable) () -> {
159                 LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
160                 finish(true);
161             }, raftActor.getContext().system().dispatcher(), raftActor.self());
162     }
163
164     void onNewLeader(final String newLeader) {
165         if (newLeader != null && newLeaderTimer != null) {
166             LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
167             newLeaderTimer.cancel();
168             finish(true);
169         }
170     }
171
172     private void finish(final boolean success) {
173         isTransferring = false;
174         if (transferTimer.isRunning()) {
175             transferTimer.stop();
176             if (success) {
177                 LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
178                         raftActor.getLeaderId(), transferTimer);
179             } else {
180                 LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer);
181                 raftActor.unpauseLeader();
182             }
183         }
184
185         for (OnComplete onComplete: onCompleteCallbacks) {
186             if (success) {
187                 onComplete.onSuccess(raftActor.self());
188             } else {
189                 onComplete.onFailure(raftActor.self());
190             }
191         }
192     }
193
194     void addOnComplete(final OnComplete onComplete) {
195         onCompleteCallbacks.add(onComplete);
196     }
197
198     boolean isTransferring() {
199         return isTransferring;
200     }
201
202     void setNewLeaderTimeoutInMillis(final long newLeaderTimeoutInMillis) {
203         if (newLeaderTimeoutInMillis != USE_DEFAULT_LEADER_TIMEOUT) {
204             this.newLeaderTimeoutInMillis = newLeaderTimeoutInMillis;
205         }
206     }
207
208     public Optional<String> getRequestedFollowerId() {
209         return Optional.fromNullable(requestedFollowerId);
210     }
211
212     interface OnComplete {
213         void onSuccess(ActorRef raftActorRef);
214
215         void onFailure(ActorRef raftActorRef);
216     }
217 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.