Turn ElectionTimeout into a proper singleton
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.TimeUnit;
16 import javax.annotation.Nonnull;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
21 import org.opendaylight.controller.cluster.raft.RaftState;
22 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
23 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
25
26 /**
27  * The behavior of a RaftActor when it is in the Leader state
28  * <p/>
29  * Leaders:
30  * <ul>
31  * <li> Upon election: send initial empty AppendEntries RPCs
32  * (heartbeat) to each server; repeat during idle periods to
33  * prevent election timeouts (§5.2)
34  * <li> If command received from client: append entry to local log,
35  * respond after entry applied to state machine (§5.3)
36  * <li> If last log index ≥ nextIndex for a follower: send
37  * AppendEntries RPC with log entries starting at nextIndex
38  * <ul>
39  * <li> If successful: update nextIndex and matchIndex for
40  * follower (§5.3)
41  * <li> If AppendEntries fails because of log inconsistency:
42  * decrement nextIndex and retry (§5.3)
43  * </ul>
44  * <li> If there exists an N such that N > commitIndex, a majority
45  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
46  * set commitIndex = N (§5.3, §5.4).
47  */
48 public class Leader extends AbstractLeader {
49     private static final IsolatedLeaderCheck ISOLATED_LEADER_CHECK = new IsolatedLeaderCheck();
50     private final Stopwatch isolatedLeaderCheck;
51     private @Nullable LeadershipTransferContext leadershipTransferContext;
52
53     public Leader(RaftActorContext context) {
54         super(context);
55         isolatedLeaderCheck = Stopwatch.createStarted();
56     }
57
58     @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
59         Preconditions.checkNotNull(sender, "sender should not be null");
60
61         if (originalMessage instanceof IsolatedLeaderCheck) {
62             if (isLeaderIsolated()) {
63                 LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
64                         context.getId(), getMinIsolatedLeaderPeerCount(), leaderId);
65
66                 return internalSwitchBehavior(RaftState.IsolatedLeader);
67             }
68         }
69
70         return super.handleMessage(sender, originalMessage);
71     }
72
73     @Override
74     protected void beforeSendHeartbeat(){
75         if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
76             context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
77             isolatedLeaderCheck.reset().start();
78         }
79
80         if(leadershipTransferContext != null && leadershipTransferContext.isExpired(
81                 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
82             LOG.debug("{}: Leadership transfer expired", logName());
83             leadershipTransferContext = null;
84         }
85     }
86
87     @Override
88     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
89         RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
90         tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
91         return returnBehavior;
92     }
93
94     /**
95      * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
96      * <ul>
97      * <li>Start a timer (Stopwatch).</li>
98      * <li>Send an initial AppendEntries heartbeat to all followers.</li>
99      * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
100      * <li>If it matches, </li>
101      *   <ul>
102      *   <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
103      *   <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
104      *   <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
105      *   </ul>
106      * <li>Otherwise if the election time out period elapses, notify
107      *     {@link RaftActorLeadershipTransferCohort#abortTtransfer}.</li>
108      * </ul>
109      *
110      * @param leadershipTransferCohort
111      */
112     public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
113         LOG.debug("{}: Attempting to transfer leadership", logName());
114
115         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
116
117         // Send an immediate heart beat to the followers.
118         sendAppendEntries(0, false);
119     }
120
121     private void tryToCompleteLeadershipTransfer(String followerId) {
122         if(leadershipTransferContext == null) {
123             return;
124         }
125
126         FollowerLogInformation followerInfo = getFollower(followerId);
127         if(followerInfo == null) {
128             return;
129         }
130
131         long lastIndex = context.getReplicatedLog().lastIndex();
132         boolean isVoting = context.getPeerInfo(followerId).isVoting();
133
134         LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
135                 logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
136
137         if(isVoting && followerInfo.getMatchIndex() == lastIndex) {
138             LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
139
140             // We can't be sure if the follower has applied all its log entries to its state so send an
141             // additional AppendEntries with the latest commit index.
142             sendAppendEntries(0, false);
143
144             // Now send an ElectionTimeout to the matching follower to immediately start an election.
145             ActorSelection followerActor = context.getPeerActorSelection(followerId);
146             followerActor.tell(ElectionTimeout.INSTANCE, context.getActor());
147
148             LOG.debug("{}: Leader transfer complete", logName());
149
150             leadershipTransferContext.transferCohort.transferComplete();
151             leadershipTransferContext = null;
152         }
153     }
154
155     @Override
156     public void close() throws Exception {
157         if(leadershipTransferContext != null) {
158             leadershipTransferContext.transferCohort.abortTransfer();
159         }
160
161         super.close();
162     }
163
164     @VisibleForTesting
165     void markFollowerActive(String followerId) {
166         getFollower(followerId).markFollowerActive();
167     }
168
169     @VisibleForTesting
170     void markFollowerInActive(String followerId) {
171         getFollower(followerId).markFollowerInActive();
172     }
173
174     private static class LeadershipTransferContext {
175         RaftActorLeadershipTransferCohort transferCohort;
176         Stopwatch timer = Stopwatch.createStarted();
177
178         LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
179             this.transferCohort = transferCohort;
180         }
181
182         boolean isExpired(long timeout) {
183             if(timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
184                 transferCohort.abortTransfer();
185                 return true;
186             }
187
188             return false;
189         }
190     }
191 }