e7409363de35bef9a2eb93a069bf8f8fdded9fa0
[controller.git] /
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Stopwatch;
14 import java.util.Optional;
15 import java.util.concurrent.TimeUnit;
16 import org.apache.pekko.actor.ActorRef;
17 import org.eclipse.jdt.annotation.NonNull;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
20 import org.opendaylight.controller.cluster.raft.RaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
22 import org.opendaylight.controller.cluster.raft.RaftState;
23 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * The behavior of a RaftActor when it is in the Leader state.
30  *
31  * <p>Leaders:
32  * <ul>
33  * <li> Upon election: send initial empty AppendEntries RPCs
34  * (heartbeat) to each server; repeat during idle periods to
35  * prevent election timeouts (§5.2)
36  * <li> If command received from client: append entry to local log,
37  * respond after entry applied to state machine (§5.3)
38  * <li> If last log index ≥ nextIndex for a follower: send
39  * AppendEntries RPC with log entries starting at nextIndex
40  * <li> If successful: update nextIndex and matchIndex for
41  * follower (§5.3)
42  * <li> If AppendEntries fails because of log inconsistency:
43  * decrement nextIndex and retry (§5.3)
44  * <li> If there exists an N such that N &gt; commitIndex, a majority
45  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
46  * set commitIndex = N (§5.3, §5.4).
47  * </ul>
48  */
49 // Non-final for testing
50 public non-sealed class Leader extends AbstractLeader {
51     private static final Logger LOG = LoggerFactory.getLogger(Leader.class);
52
53     /**
54      * Internal message sent to periodically check if this leader has become isolated and should transition
55      * to {@link IsolatedLeader}.
56      */
57     @VisibleForTesting
58     static final Object ISOLATED_LEADER_CHECK = new Object();
59
60     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
61     private @Nullable LeadershipTransferContext leadershipTransferContext;
62
63     Leader(final RaftActorContext context, final IsolatedLeader initializeFromLeader) {
64         super(context, RaftState.Leader, requireNonNull(initializeFromLeader));
65     }
66
67     Leader(final RaftActorContext context, final PreLeader initializeFromLeader) {
68         super(context, RaftState.Leader, requireNonNull(initializeFromLeader));
69     }
70
71     @VisibleForTesting
72     public Leader(final RaftActorContext context) {
73         super(context, RaftState.Leader);
74     }
75
76     @Override
77     public RaftActorBehavior handleMessage(final ActorRef sender, final Object originalMessage) {
78         requireNonNull(sender, "sender should not be null");
79
80         if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
81             if (isLeaderIsolated()) {
82                 LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
83                     logName, getMinIsolatedLeaderPeerCount(), getLeaderId());
84                 return internalSwitchBehavior(new IsolatedLeader(context, this));
85             } else {
86                 return this;
87             }
88         } else {
89             return super.handleMessage(sender, originalMessage);
90         }
91     }
92
93     @Override
94     protected void beforeSendHeartbeat() {
95         if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS)
96                 > context.getConfigParams().getIsolatedCheckIntervalInMillis()) {
97             context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
98             isolatedLeaderCheck.reset().start();
99         }
100
101         if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
102                 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
103             LOG.debug("{}: Leadership transfer expired", logName);
104             leadershipTransferContext = null;
105         }
106     }
107
108     @Override
109     RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) {
110         RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
111         tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
112         return returnBehavior;
113     }
114
115     /**
116      * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
117      * <ul>
118      * <li>Start a timer (Stopwatch).</li>
119      * <li>Send an initial AppendEntries heartbeat to all followers.</li>
120      * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
121      * <li>If it matches,
122      *   <ul>
123      *   <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
124      *   <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
125      *   <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
126      *   </ul></li>
127      * <li>Otherwise if the election time out period elapses, notify
128      *     {@link RaftActorLeadershipTransferCohort#abortTransfer}.</li>
129      * </ul>
130      *
131      * @param leadershipTransferCohort the cohort participating in the leadership transfer
132      */
133     public void transferLeadership(@NonNull final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
134         LOG.debug("{}: Attempting to transfer leadership", logName);
135
136         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
137
138         // Send an immediate heart beat to the followers.
139         sendAppendEntries(0, false);
140     }
141
142     private void tryToCompleteLeadershipTransfer(final String followerId) {
143         if (leadershipTransferContext == null) {
144             return;
145         }
146
147         final Optional<String> requestedFollowerIdOptional
148                 = leadershipTransferContext.transferCohort.getRequestedFollowerId();
149         if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.orElseThrow().equals(followerId)) {
150             // we want to transfer leadership to specific follower
151             return;
152         }
153
154         FollowerLogInformation followerInfo = getFollower(followerId);
155         if (followerInfo == null) {
156             return;
157         }
158
159         long lastIndex = context.getReplicatedLog().lastIndex();
160         boolean isVoting = context.getPeerInfo(followerId).isVoting();
161
162         LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
163                 logName, followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
164
165         if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
166             LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName);
167
168             // We can't be sure if the follower has applied all its log entries to its state so send an
169             // additional AppendEntries with the latest commit index.
170             sendAppendEntries(0, false);
171
172             // Now send a TimeoutNow message to the matching follower to immediately start an election.
173             final var followerActor = context.getPeerActorSelection(followerId);
174             followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
175
176             LOG.debug("{}: Leader transfer complete", logName);
177
178             leadershipTransferContext.transferCohort.transferComplete();
179             leadershipTransferContext = null;
180         }
181     }
182
183     @Override
184     public void close() {
185         if (leadershipTransferContext != null) {
186             LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
187             leadershipTransferContext = null;
188             localLeadershipTransferContext.transferCohort.abortTransfer();
189         }
190
191         super.close();
192     }
193
194     @VisibleForTesting
195     void markFollowerActive(final String followerId) {
196         getFollower(followerId).markFollowerActive();
197     }
198
199     @VisibleForTesting
200     void markFollowerInActive(final String followerId) {
201         getFollower(followerId).markFollowerInActive();
202     }
203
204     private static class LeadershipTransferContext {
205         RaftActorLeadershipTransferCohort transferCohort;
206         Stopwatch timer = Stopwatch.createStarted();
207
208         LeadershipTransferContext(final RaftActorLeadershipTransferCohort transferCohort) {
209             this.transferCohort = transferCohort;
210         }
211
212         boolean isExpired(final long timeout) {
213             if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
214                 transferCohort.abortTransfer();
215                 return true;
216             }
217
218             return false;
219         }
220     }
221 }