Migrate most of CDS to use java.util.Optional
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Stopwatch;
17 import java.util.concurrent.TimeUnit;
18 import org.eclipse.jdt.annotation.NonNull;
19 import org.eclipse.jdt.annotation.Nullable;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
23 import org.opendaylight.controller.cluster.raft.RaftState;
24 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
26
27 /**
28  * The behavior of a RaftActor when it is in the Leader state.
29  *
30  * <p>
31  * Leaders:
32  * <ul>
33  * <li> Upon election: send initial empty AppendEntries RPCs
34  * (heartbeat) to each server; repeat during idle periods to
35  * prevent election timeouts (§5.2)
36  * <li> If command received from client: append entry to local log,
37  * respond after entry applied to state machine (§5.3)
38  * <li> If last log index ≥ nextIndex for a follower: send
39  * AppendEntries RPC with log entries starting at nextIndex
40  * <li> If successful: update nextIndex and matchIndex for
41  * follower (§5.3)
42  * <li> If AppendEntries fails because of log inconsistency:
43  * decrement nextIndex and retry (§5.3)
44  * <li> If there exists an N such that N &gt; commitIndex, a majority
45  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
46  * set commitIndex = N (§5.3, §5.4).
47  * </ul>
48  */
49 public class Leader extends AbstractLeader {
50     /**
51      * Internal message sent to periodically check if this leader has become isolated and should transition
52      * to {@link IsolatedLeader}.
53      */
54     @VisibleForTesting
55     static final Object ISOLATED_LEADER_CHECK = new Object();
56
57     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
58     private @Nullable LeadershipTransferContext leadershipTransferContext;
59
60     Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
61         super(context, RaftState.Leader, initializeFromLeader);
62     }
63
64     public Leader(RaftActorContext context) {
65         this(context, null);
66     }
67
68     @Override
69     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
70         requireNonNull(sender, "sender should not be null");
71
72         if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
73             if (isLeaderIsolated()) {
74                 log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
75                     context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
76                 return internalSwitchBehavior(new IsolatedLeader(context, this));
77             } else {
78                 return this;
79             }
80         } else {
81             return super.handleMessage(sender, originalMessage);
82         }
83     }
84
85     @Override
86     protected void beforeSendHeartbeat() {
87         if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS)
88                 > context.getConfigParams().getIsolatedCheckIntervalInMillis()) {
89             context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
90             isolatedLeaderCheck.reset().start();
91         }
92
93         if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
94                 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
95             log.debug("{}: Leadership transfer expired", logName());
96             leadershipTransferContext = null;
97         }
98     }
99
100     @Override
101     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
102         RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
103         tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
104         return returnBehavior;
105     }
106
107     /**
108      * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
109      * <ul>
110      * <li>Start a timer (Stopwatch).</li>
111      * <li>Send an initial AppendEntries heartbeat to all followers.</li>
112      * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
113      * <li>If it matches,
114      *   <ul>
115      *   <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
116      *   <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
117      *   <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
118      *   </ul></li>
119      * <li>Otherwise if the election time out period elapses, notify
120      *     {@link RaftActorLeadershipTransferCohort#abortTransfer}.</li>
121      * </ul>
122      *
123      * @param leadershipTransferCohort the cohort participating in the leadership transfer
124      */
125     public void transferLeadership(@NonNull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
126         log.debug("{}: Attempting to transfer leadership", logName());
127
128         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
129
130         // Send an immediate heart beat to the followers.
131         sendAppendEntries(0, false);
132     }
133
134     private void tryToCompleteLeadershipTransfer(String followerId) {
135         if (leadershipTransferContext == null) {
136             return;
137         }
138
139         final Optional<String> requestedFollowerIdOptional
140                 = leadershipTransferContext.transferCohort.getRequestedFollowerId();
141         if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.get().equals(followerId)) {
142             // we want to transfer leadership to specific follower
143             return;
144         }
145
146         FollowerLogInformation followerInfo = getFollower(followerId);
147         if (followerInfo == null) {
148             return;
149         }
150
151         long lastIndex = context.getReplicatedLog().lastIndex();
152         boolean isVoting = context.getPeerInfo(followerId).isVoting();
153
154         log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
155                 logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
156
157         if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
158             log.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
159
160             // We can't be sure if the follower has applied all its log entries to its state so send an
161             // additional AppendEntries with the latest commit index.
162             sendAppendEntries(0, false);
163
164             // Now send a TimeoutNow message to the matching follower to immediately start an election.
165             ActorSelection followerActor = context.getPeerActorSelection(followerId);
166             followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
167
168             log.debug("{}: Leader transfer complete", logName());
169
170             leadershipTransferContext.transferCohort.transferComplete();
171             leadershipTransferContext = null;
172         }
173     }
174
175     @Override
176     public void close() {
177         if (leadershipTransferContext != null) {
178             LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
179             leadershipTransferContext = null;
180             localLeadershipTransferContext.transferCohort.abortTransfer();
181         }
182
183         super.close();
184     }
185
186     @VisibleForTesting
187     void markFollowerActive(String followerId) {
188         getFollower(followerId).markFollowerActive();
189     }
190
191     @VisibleForTesting
192     void markFollowerInActive(String followerId) {
193         getFollower(followerId).markFollowerInActive();
194     }
195
196     private static class LeadershipTransferContext {
197         RaftActorLeadershipTransferCohort transferCohort;
198         Stopwatch timer = Stopwatch.createStarted();
199
200         LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
201             this.transferCohort = transferCohort;
202         }
203
204         boolean isExpired(long timeout) {
205             if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
206                 transferCohort.abortTransfer();
207                 return true;
208             }
209
210             return false;
211         }
212     }
213 }