2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.concurrent.TimeUnit;
17 import javax.annotation.Nonnull;
18 import javax.annotation.Nullable;
19 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
20 import org.opendaylight.controller.cluster.raft.RaftActorContext;
21 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
22 import org.opendaylight.controller.cluster.raft.RaftState;
23 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 * The behavior of a RaftActor when it is in the Leader state.
32 * <li> Upon election: send initial empty AppendEntries RPCs
33 * (heartbeat) to each server; repeat during idle periods to
34 * prevent election timeouts (§5.2)
35 * <li> If command received from client: append entry to local log,
36 * respond after entry applied to state machine (§5.3)
37 * <li> If last log index ≥ nextIndex for a follower: send
38 * AppendEntries RPC with log entries starting at nextIndex
39 * <li> If successful: update nextIndex and matchIndex for
41 * <li> If AppendEntries fails because of log inconsistency:
42 * decrement nextIndex and retry (§5.3)
43 * <li> If there exists an N such that N > commitIndex, a majority
44 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
45 * set commitIndex = N (§5.3, §5.4).
48 public class Leader extends AbstractLeader {
50 * Internal message sent to periodically check if this leader has become isolated and should transition
51 * to {@link IsolatedLeader}.
54 static final Object ISOLATED_LEADER_CHECK = new Object();
56 private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
57 @Nullable private LeadershipTransferContext leadershipTransferContext;
59 Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
60 super(context, RaftState.Leader, initializeFromLeader);
63 public Leader(RaftActorContext context) {
68 public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
69 Preconditions.checkNotNull(sender, "sender should not be null");
71 if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
72 if (isLeaderIsolated()) {
73 log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
74 context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
75 return internalSwitchBehavior(new IsolatedLeader(context, this));
80 return super.handleMessage(sender, originalMessage);
85 protected void beforeSendHeartbeat() {
86 if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS)
87 > context.getConfigParams().getIsolatedCheckIntervalInMillis()) {
88 context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
89 isolatedLeaderCheck.reset().start();
92 if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
93 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
94 log.debug("{}: Leadership transfer expired", logName());
95 leadershipTransferContext = null;
100 protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
101 RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
102 tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
103 return returnBehavior;
107 * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
109 * <li>Start a timer (Stopwatch).</li>
110 * <li>Send an initial AppendEntries heartbeat to all followers.</li>
111 * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
114 * <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
115 * <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
116 * <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
118 * <li>Otherwise if the election time out period elapses, notify
119 * {@link RaftActorLeadershipTransferCohort#abortTransfer}.</li>
122 * @param leadershipTransferCohort the cohort participating in the leadership transfer
124 public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
125 log.debug("{}: Attempting to transfer leadership", logName());
127 leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
129 // Send an immediate heart beat to the followers.
130 sendAppendEntries(0, false);
133 private void tryToCompleteLeadershipTransfer(String followerId) {
134 if (leadershipTransferContext == null) {
138 final Optional<String> requestedFollowerIdOptional
139 = leadershipTransferContext.transferCohort.getRequestedFollowerId();
140 if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.get().equals(followerId)) {
141 // we want to transfer leadership to specific follower
145 FollowerLogInformation followerInfo = getFollower(followerId);
146 if (followerInfo == null) {
150 long lastIndex = context.getReplicatedLog().lastIndex();
151 boolean isVoting = context.getPeerInfo(followerId).isVoting();
153 log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
154 logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
156 if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
157 log.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
159 // We can't be sure if the follower has applied all its log entries to its state so send an
160 // additional AppendEntries with the latest commit index.
161 sendAppendEntries(0, false);
163 // Now send a TimeoutNow message to the matching follower to immediately start an election.
164 ActorSelection followerActor = context.getPeerActorSelection(followerId);
165 followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
167 log.debug("{}: Leader transfer complete", logName());
169 leadershipTransferContext.transferCohort.transferComplete();
170 leadershipTransferContext = null;
175 public void close() {
176 if (leadershipTransferContext != null) {
177 LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
178 leadershipTransferContext = null;
179 localLeadershipTransferContext.transferCohort.abortTransfer();
186 void markFollowerActive(String followerId) {
187 getFollower(followerId).markFollowerActive();
191 void markFollowerInActive(String followerId) {
192 getFollower(followerId).markFollowerInActive();
195 private static class LeadershipTransferContext {
196 RaftActorLeadershipTransferCohort transferCohort;
197 Stopwatch timer = Stopwatch.createStarted();
199 LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
200 this.transferCohort = transferCohort;
203 boolean isExpired(long timeout) {
204 if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
205 transferCohort.abortTransfer();