2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Stopwatch;
14 import java.util.Optional;
15 import java.util.concurrent.TimeUnit;
16 import org.apache.pekko.actor.ActorRef;
17 import org.eclipse.jdt.annotation.NonNull;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
20 import org.opendaylight.controller.cluster.raft.RaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
22 import org.opendaylight.controller.cluster.raft.RaftState;
23 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * The behavior of a RaftActor when it is in the Leader state.
33 * <li> Upon election: send initial empty AppendEntries RPCs
34 * (heartbeat) to each server; repeat during idle periods to
35 * prevent election timeouts (§5.2)
36 * <li> If command received from client: append entry to local log,
37 * respond after entry applied to state machine (§5.3)
38 * <li> If last log index ≥ nextIndex for a follower: send
39 * AppendEntries RPC with log entries starting at nextIndex
40 * <li> If successful: update nextIndex and matchIndex for
42 * <li> If AppendEntries fails because of log inconsistency:
43 * decrement nextIndex and retry (§5.3)
44 * <li> If there exists an N such that N > commitIndex, a majority
45 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
46 * set commitIndex = N (§5.3, §5.4).
49 // Non-final for testing
50 public non-sealed class Leader extends AbstractLeader {
51 private static final Logger LOG = LoggerFactory.getLogger(Leader.class);
54 * Internal message sent to periodically check if this leader has become isolated and should transition
55 * to {@link IsolatedLeader}.
58 static final Object ISOLATED_LEADER_CHECK = new Object();
60 private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
61 private @Nullable LeadershipTransferContext leadershipTransferContext;
63 Leader(final RaftActorContext context, final IsolatedLeader initializeFromLeader) {
64 super(context, RaftState.Leader, requireNonNull(initializeFromLeader));
67 Leader(final RaftActorContext context, final PreLeader initializeFromLeader) {
68 super(context, RaftState.Leader, requireNonNull(initializeFromLeader));
72 public Leader(final RaftActorContext context) {
73 super(context, RaftState.Leader);
77 public RaftActorBehavior handleMessage(final ActorRef sender, final Object originalMessage) {
78 requireNonNull(sender, "sender should not be null");
80 if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
81 if (isLeaderIsolated()) {
82 LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
83 logName, getMinIsolatedLeaderPeerCount(), getLeaderId());
84 return internalSwitchBehavior(new IsolatedLeader(context, this));
89 return super.handleMessage(sender, originalMessage);
94 protected void beforeSendHeartbeat() {
95 if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS)
96 > context.getConfigParams().getIsolatedCheckIntervalInMillis()) {
97 context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
98 isolatedLeaderCheck.reset().start();
101 if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
102 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
103 LOG.debug("{}: Leadership transfer expired", logName);
104 leadershipTransferContext = null;
109 RaftActorBehavior handleAppendEntriesReply(final ActorRef sender, final AppendEntriesReply appendEntriesReply) {
110 RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
111 tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
112 return returnBehavior;
116 * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
118 * <li>Start a timer (Stopwatch).</li>
119 * <li>Send an initial AppendEntries heartbeat to all followers.</li>
120 * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
123 * <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
124 * <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
125 * <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
127 * <li>Otherwise if the election time out period elapses, notify
128 * {@link RaftActorLeadershipTransferCohort#abortTransfer}.</li>
131 * @param leadershipTransferCohort the cohort participating in the leadership transfer
133 public void transferLeadership(@NonNull final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
134 LOG.debug("{}: Attempting to transfer leadership", logName);
136 leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
138 // Send an immediate heart beat to the followers.
139 sendAppendEntries(0, false);
142 private void tryToCompleteLeadershipTransfer(final String followerId) {
143 if (leadershipTransferContext == null) {
147 final Optional<String> requestedFollowerIdOptional
148 = leadershipTransferContext.transferCohort.getRequestedFollowerId();
149 if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.orElseThrow().equals(followerId)) {
150 // we want to transfer leadership to specific follower
154 FollowerLogInformation followerInfo = getFollower(followerId);
155 if (followerInfo == null) {
159 long lastIndex = context.getReplicatedLog().lastIndex();
160 boolean isVoting = context.getPeerInfo(followerId).isVoting();
162 LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
163 logName, followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
165 if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
166 LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName);
168 // We can't be sure if the follower has applied all its log entries to its state so send an
169 // additional AppendEntries with the latest commit index.
170 sendAppendEntries(0, false);
172 // Now send a TimeoutNow message to the matching follower to immediately start an election.
173 final var followerActor = context.getPeerActorSelection(followerId);
174 followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
176 LOG.debug("{}: Leader transfer complete", logName);
178 leadershipTransferContext.transferCohort.transferComplete();
179 leadershipTransferContext = null;
184 public void close() {
185 if (leadershipTransferContext != null) {
186 LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
187 leadershipTransferContext = null;
188 localLeadershipTransferContext.transferCohort.abortTransfer();
195 void markFollowerActive(final String followerId) {
196 getFollower(followerId).markFollowerActive();
200 void markFollowerInActive(final String followerId) {
201 getFollower(followerId).markFollowerInActive();
204 private static class LeadershipTransferContext {
205 RaftActorLeadershipTransferCohort transferCohort;
206 Stopwatch timer = Stopwatch.createStarted();
208 LeadershipTransferContext(final RaftActorLeadershipTransferCohort transferCohort) {
209 this.transferCohort = transferCohort;
212 boolean isExpired(final long timeout) {
213 if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
214 transferCohort.abortTransfer();