6bbb70ce6b71ebb2ed7077b4962235fbf13f5cdf
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.TimeUnit;
16 import javax.annotation.Nonnull;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
21 import org.opendaylight.controller.cluster.raft.RaftState;
22 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
23 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
25
26 /**
27  * The behavior of a RaftActor when it is in the Leader state
28  * <p/>
29  * Leaders:
30  * <ul>
31  * <li> Upon election: send initial empty AppendEntries RPCs
32  * (heartbeat) to each server; repeat during idle periods to
33  * prevent election timeouts (§5.2)
34  * <li> If command received from client: append entry to local log,
35  * respond after entry applied to state machine (§5.3)
36  * <li> If last log index ≥ nextIndex for a follower: send
37  * AppendEntries RPC with log entries starting at nextIndex
38  * <ul>
39  * <li> If successful: update nextIndex and matchIndex for
40  * follower (§5.3)
41  * <li> If AppendEntries fails because of log inconsistency:
42  * decrement nextIndex and retry (§5.3)
43  * </ul>
44  * <li> If there exists an N such that N > commitIndex, a majority
45  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
46  * set commitIndex = N (§5.3, §5.4).
47  */
48 public class Leader extends AbstractLeader {
49     private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
50     private final Stopwatch isolatedLeaderCheck;
51     private @Nullable LeadershipTransferContext leadershipTransferContext;
52
53     public Leader(RaftActorContext context) {
54         super(context);
55         isolatedLeaderCheck = Stopwatch.createStarted();
56     }
57
58     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
59         Preconditions.checkNotNull(sender, "sender should not be null");
60
61         if (originalMessage instanceof IsolatedLeaderCheck) {
62             if (isLeaderIsolated()) {
63                 LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
64                         context.getId(), getMinIsolatedLeaderPeerCount(), leaderId);
65
66                 return internalSwitchBehavior(RaftState.IsolatedLeader);
67             }
68         }
69
70         return super.handleMessage(sender, originalMessage);
71     }
72
73     @Override
74     protected void beforeSendHeartbeat(){
75         if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
76             context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
77             isolatedLeaderCheck.reset().start();
78         }
79
80         if(leadershipTransferContext != null && leadershipTransferContext.isExpired(
81                 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
82             LOG.debug("{}: Leadership transfer expired", logName());
83             leadershipTransferContext = null;
84         }
85     }
86
87     @Override
88     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
89         RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
90         tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
91         return returnBehavior;
92     }
93
94     public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
95         if(!context.hasFollowers()) {
96             leadershipTransferCohort.transferComplete();
97             return;
98         }
99
100         LOG.debug("{}: Attempting to transfer leadership", logName());
101
102         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
103
104         // Send an immediate heart beat to the followers.
105         sendAppendEntries(0, false);
106     }
107
108     private void tryToCompleteLeadershipTransfer(String followerId) {
109         if(leadershipTransferContext == null) {
110             return;
111         }
112
113         FollowerLogInformation followerInfo = getFollower(followerId);
114         if(followerInfo == null) {
115             return;
116         }
117
118         long lastIndex = context.getReplicatedLog().lastIndex();
119
120         LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}",
121                 logName(), followerId, followerInfo.getMatchIndex(), lastIndex);
122
123         if(followerInfo.getMatchIndex() == lastIndex) {
124             LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
125
126             // We can't be sure if the follower has applied all its log entries to its state so send an
127             // additional AppendEntries.
128             sendAppendEntries(0, false);
129
130             // Now send an ElectionTimeout to the matching follower to immediately start an election.
131             ActorSelection followerActor = context.getPeerActorSelection(followerId);
132             followerActor.tell(new ElectionTimeout(), context.getActor());
133
134             LOG.debug("{}: Leader transfer complete", logName());
135
136             leadershipTransferContext.transferCohort.transferComplete();
137             leadershipTransferContext = null;
138         }
139     }
140
141     @Override
142     public void close() throws Exception {
143         if(leadershipTransferContext != null) {
144             leadershipTransferContext.transferCohort.abortTransfer();
145         }
146
147         super.close();
148     }
149
150     @VisibleForTesting
151     void markFollowerActive(String followerId) {
152         getFollower(followerId).markFollowerActive();
153     }
154
155     @VisibleForTesting
156     void markFollowerInActive(String followerId) {
157         getFollower(followerId).markFollowerInActive();
158     }
159
160     private static class LeadershipTransferContext {
161         RaftActorLeadershipTransferCohort transferCohort;
162         Stopwatch timer = Stopwatch.createStarted();
163
164         LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
165             this.transferCohort = transferCohort;
166         }
167
168         boolean isExpired(long timeout) {
169             if(timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
170                 transferCohort.abortTransfer();
171                 return true;
172             }
173
174             return false;
175         }
176     }
177 }