2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.TimeUnit;
16 import javax.annotation.Nonnull;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
21 import org.opendaylight.controller.cluster.raft.RaftState;
22 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
23 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
27 * The behavior of a RaftActor when it is in the Leader state
31 * <li> Upon election: send initial empty AppendEntries RPCs
32 * (heartbeat) to each server; repeat during idle periods to
33 * prevent election timeouts (§5.2)
34 * <li> If command received from client: append entry to local log,
35 * respond after entry applied to state machine (§5.3)
36 * <li> If last log index ≥ nextIndex for a follower: send
37 * AppendEntries RPC with log entries starting at nextIndex
39 * <li> If successful: update nextIndex and matchIndex for
41 * <li> If AppendEntries fails because of log inconsistency:
42 * decrement nextIndex and retry (§5.3)
44 * <li> If there exists an N such that N > commitIndex, a majority
45 * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
46 * set commitIndex = N (§5.3, §5.4).
48 public class Leader extends AbstractLeader {
49 private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
50 private final Stopwatch isolatedLeaderCheck;
51 private @Nullable LeadershipTransferContext leadershipTransferContext;
53 public Leader(RaftActorContext context) {
55 isolatedLeaderCheck = Stopwatch.createStarted();
58 @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
59 Preconditions.checkNotNull(sender, "sender should not be null");
61 if (originalMessage instanceof IsolatedLeaderCheck) {
62 if (isLeaderIsolated()) {
63 LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
64 context.getId(), getMinIsolatedLeaderPeerCount(), leaderId);
66 return internalSwitchBehavior(RaftState.IsolatedLeader);
70 return super.handleMessage(sender, originalMessage);
74 protected void beforeSendHeartbeat(){
75 if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
76 context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
77 isolatedLeaderCheck.reset().start();
80 if(leadershipTransferContext != null && leadershipTransferContext.isExpired(
81 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
82 LOG.debug("{}: Leadership transfer expired", logName());
83 leadershipTransferContext = null;
88 protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
89 RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
90 tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
91 return returnBehavior;
94 public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
95 if(!context.hasFollowers()) {
96 leadershipTransferCohort.transferComplete();
100 LOG.debug("{}: Attempting to transfer leadership", logName());
102 leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
104 // Send an immediate heart beat to the followers.
105 sendAppendEntries(0, false);
108 private void tryToCompleteLeadershipTransfer(String followerId) {
109 if(leadershipTransferContext == null) {
113 FollowerLogInformation followerInfo = getFollower(followerId);
114 if(followerInfo == null) {
118 long lastIndex = context.getReplicatedLog().lastIndex();
120 LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}",
121 logName(), followerId, followerInfo.getMatchIndex(), lastIndex);
123 if(followerInfo.getMatchIndex() == lastIndex) {
124 LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
126 // We can't be sure if the follower has applied all its log entries to its state so send an
127 // additional AppendEntries.
128 sendAppendEntries(0, false);
130 // Now send an ElectionTimeout to the matching follower to immediately start an election.
131 ActorSelection followerActor = context.getPeerActorSelection(followerId);
132 followerActor.tell(new ElectionTimeout(), context.getActor());
134 LOG.debug("{}: Leader transfer complete", logName());
136 leadershipTransferContext.transferCohort.transferComplete();
137 leadershipTransferContext = null;
142 public void close() throws Exception {
143 if(leadershipTransferContext != null) {
144 leadershipTransferContext.transferCohort.abortTransfer();
151 void markFollowerActive(String followerId) {
152 getFollower(followerId).markFollowerActive();
156 void markFollowerInActive(String followerId) {
157 getFollower(followerId).markFollowerInActive();
160 private static class LeadershipTransferContext {
161 RaftActorLeadershipTransferCohort transferCohort;
162 Stopwatch timer = Stopwatch.createStarted();
164 LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
165 this.transferCohort = transferCohort;
168 boolean isExpired(long timeout) {
169 if(timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
170 transferCohort.abortTransfer();