Measure follower activity in nanoseconds
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.concurrent.TimeUnit;
17 import javax.annotation.Nonnull;
18 import javax.annotation.Nullable;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
20 import org.opendaylight.controller.cluster.raft.RaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
22 import org.opendaylight.controller.cluster.raft.RaftState;
23 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
25
26 /**
27  * The behavior of a RaftActor when it is in the Leader state.
28  *
29  * <p>
30  * Leaders:
31  * <ul>
32  * <li> Upon election: send initial empty AppendEntries RPCs
33  * (heartbeat) to each server; repeat during idle periods to
34  * prevent election timeouts (§5.2)
35  * <li> If command received from client: append entry to local log,
36  * respond after entry applied to state machine (§5.3)
37  * <li> If last log index ≥ nextIndex for a follower: send
38  * AppendEntries RPC with log entries starting at nextIndex
39  * <li> If successful: update nextIndex and matchIndex for
40  * follower (§5.3)
41  * <li> If AppendEntries fails because of log inconsistency:
42  * decrement nextIndex and retry (§5.3)
43  * <li> If there exists an N such that N &gt; commitIndex, a majority
44  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
45  * set commitIndex = N (§5.3, §5.4).
46  * </ul>
47  */
48 public class Leader extends AbstractLeader {
49     /**
50      * Internal message sent to periodically check if this leader has become isolated and should transition
51      * to {@link IsolatedLeader}.
52      */
53     @VisibleForTesting
54     static final Object ISOLATED_LEADER_CHECK = new Object();
55
56     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
57     @Nullable private LeadershipTransferContext leadershipTransferContext;
58
59     Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
60         super(context, RaftState.Leader, initializeFromLeader);
61     }
62
63     public Leader(RaftActorContext context) {
64         this(context, null);
65     }
66
67     @Override
68     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
69         Preconditions.checkNotNull(sender, "sender should not be null");
70
71         if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
72             if (isLeaderIsolated()) {
73                 log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
74                     context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
75                 return internalSwitchBehavior(new IsolatedLeader(context, this));
76             } else {
77                 return this;
78             }
79         } else {
80             return super.handleMessage(sender, originalMessage);
81         }
82     }
83
84     @Override
85     protected void beforeSendHeartbeat() {
86         if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS)
87                 > context.getConfigParams().getIsolatedCheckIntervalInMillis()) {
88             context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
89             isolatedLeaderCheck.reset().start();
90         }
91
92         if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
93                 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
94             log.debug("{}: Leadership transfer expired", logName());
95             leadershipTransferContext = null;
96         }
97     }
98
99     @Override
100     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
101         RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
102         tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
103         return returnBehavior;
104     }
105
106     /**
107      * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
108      * <ul>
109      * <li>Start a timer (Stopwatch).</li>
110      * <li>Send an initial AppendEntries heartbeat to all followers.</li>
111      * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
112      * <li>If it matches,
113      *   <ul>
114      *   <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
115      *   <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
116      *   <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
117      *   </ul></li>
118      * <li>Otherwise if the election time out period elapses, notify
119      *     {@link RaftActorLeadershipTransferCohort#abortTransfer}.</li>
120      * </ul>
121      *
122      * @param leadershipTransferCohort the cohort participating in the leadership transfer
123      */
124     public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
125         log.debug("{}: Attempting to transfer leadership", logName());
126
127         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
128
129         // Send an immediate heart beat to the followers.
130         sendAppendEntries(0, false);
131     }
132
133     private void tryToCompleteLeadershipTransfer(String followerId) {
134         if (leadershipTransferContext == null) {
135             return;
136         }
137
138         final Optional<String> requestedFollowerIdOptional
139                 = leadershipTransferContext.transferCohort.getRequestedFollowerId();
140         if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.get().equals(followerId)) {
141             // we want to transfer leadership to specific follower
142             return;
143         }
144
145         FollowerLogInformation followerInfo = getFollower(followerId);
146         if (followerInfo == null) {
147             return;
148         }
149
150         long lastIndex = context.getReplicatedLog().lastIndex();
151         boolean isVoting = context.getPeerInfo(followerId).isVoting();
152
153         log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
154                 logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
155
156         if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
157             log.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
158
159             // We can't be sure if the follower has applied all its log entries to its state so send an
160             // additional AppendEntries with the latest commit index.
161             sendAppendEntries(0, false);
162
163             // Now send a TimeoutNow message to the matching follower to immediately start an election.
164             ActorSelection followerActor = context.getPeerActorSelection(followerId);
165             followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
166
167             log.debug("{}: Leader transfer complete", logName());
168
169             leadershipTransferContext.transferCohort.transferComplete();
170             leadershipTransferContext = null;
171         }
172     }
173
174     @Override
175     public void close() {
176         if (leadershipTransferContext != null) {
177             LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
178             leadershipTransferContext = null;
179             localLeadershipTransferContext.transferCohort.abortTransfer();
180         }
181
182         super.close();
183     }
184
185     @VisibleForTesting
186     void markFollowerActive(String followerId) {
187         getFollower(followerId).markFollowerActive();
188     }
189
190     @VisibleForTesting
191     void markFollowerInActive(String followerId) {
192         getFollower(followerId).markFollowerInActive();
193     }
194
195     private static class LeadershipTransferContext {
196         RaftActorLeadershipTransferCohort transferCohort;
197         Stopwatch timer = Stopwatch.createStarted();
198
199         LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
200             this.transferCohort = transferCohort;
201         }
202
203         boolean isExpired(long timeout) {
204             if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
205                 transferCohort.abortTransfer();
206                 return true;
207             }
208
209             return false;
210         }
211     }
212 }