Further Guava Optional cleanups
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Stopwatch;
16 import java.util.Optional;
17 import java.util.concurrent.TimeUnit;
18 import org.eclipse.jdt.annotation.NonNull;
19 import org.eclipse.jdt.annotation.Nullable;
20 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
23 import org.opendaylight.controller.cluster.raft.RaftState;
24 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
25 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
26
27 /**
28  * The behavior of a RaftActor when it is in the Leader state.
29  *
30  * <p>
31  * Leaders:
32  * <ul>
33  * <li> Upon election: send initial empty AppendEntries RPCs
34  * (heartbeat) to each server; repeat during idle periods to
35  * prevent election timeouts (§5.2)
36  * <li> If command received from client: append entry to local log,
37  * respond after entry applied to state machine (§5.3)
38  * <li> If last log index ≥ nextIndex for a follower: send
39  * AppendEntries RPC with log entries starting at nextIndex
40  * <li> If successful: update nextIndex and matchIndex for
41  * follower (§5.3)
42  * <li> If AppendEntries fails because of log inconsistency:
43  * decrement nextIndex and retry (§5.3)
44  * <li> If there exists an N such that N &gt; commitIndex, a majority
45  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
46  * set commitIndex = N (§5.3, §5.4).
47  * </ul>
48  */
49 public class Leader extends AbstractLeader {
50     /**
51      * Internal message sent to periodically check if this leader has become isolated and should transition
52      * to {@link IsolatedLeader}.
53      */
54     @VisibleForTesting
55     static final Object ISOLATED_LEADER_CHECK = new Object();
56
57     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
58     private @Nullable LeadershipTransferContext leadershipTransferContext;
59
60     Leader(final RaftActorContext context, @Nullable final AbstractLeader initializeFromLeader) {
61         super(context, RaftState.Leader, initializeFromLeader);
62     }
63
64     public Leader(final RaftActorContext context) {
65         this(context, null);
66     }
67
68     @Override
69     public RaftActorBehavior handleMessage(final ActorRef sender, final Object originalMessage) {
70         requireNonNull(sender, "sender should not be null");
71
72         if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
73             if (isLeaderIsolated()) {
74                 log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
75                     context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
76                 return internalSwitchBehavior(new IsolatedLeader(context, this));
77             } else {
78                 return this;
79             }
80         } else {
81             return super.handleMessage(sender, originalMessage);
82         }
83     }
84
85     @Override
86     protected void beforeSendHeartbeat() {
87         if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS)
88                 > context.getConfigParams().getIsolatedCheckIntervalInMillis()) {
89             context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
90             isolatedLeaderCheck.reset().start();
91         }
92
93         if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
94                 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
95             log.debug("{}: Leadership transfer expired", logName());
96             leadershipTransferContext = null;
97         }
98     }
99
100     @Override
101     protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
102             final AppendEntriesReply appendEntriesReply) {
103         RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
104         tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
105         return returnBehavior;
106     }
107
108     /**
109      * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
110      * <ul>
111      * <li>Start a timer (Stopwatch).</li>
112      * <li>Send an initial AppendEntries heartbeat to all followers.</li>
113      * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
114      * <li>If it matches,
115      *   <ul>
116      *   <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
117      *   <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
118      *   <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
119      *   </ul></li>
120      * <li>Otherwise if the election time out period elapses, notify
121      *     {@link RaftActorLeadershipTransferCohort#abortTransfer}.</li>
122      * </ul>
123      *
124      * @param leadershipTransferCohort the cohort participating in the leadership transfer
125      */
126     public void transferLeadership(@NonNull final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
127         log.debug("{}: Attempting to transfer leadership", logName());
128
129         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
130
131         // Send an immediate heart beat to the followers.
132         sendAppendEntries(0, false);
133     }
134
135     private void tryToCompleteLeadershipTransfer(final String followerId) {
136         if (leadershipTransferContext == null) {
137             return;
138         }
139
140         final Optional<String> requestedFollowerIdOptional
141                 = leadershipTransferContext.transferCohort.getRequestedFollowerId();
142         if (requestedFollowerIdOptional.isPresent() && !requestedFollowerIdOptional.get().equals(followerId)) {
143             // we want to transfer leadership to specific follower
144             return;
145         }
146
147         FollowerLogInformation followerInfo = getFollower(followerId);
148         if (followerInfo == null) {
149             return;
150         }
151
152         long lastIndex = context.getReplicatedLog().lastIndex();
153         boolean isVoting = context.getPeerInfo(followerId).isVoting();
154
155         log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
156                 logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
157
158         if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
159             log.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
160
161             // We can't be sure if the follower has applied all its log entries to its state so send an
162             // additional AppendEntries with the latest commit index.
163             sendAppendEntries(0, false);
164
165             // Now send a TimeoutNow message to the matching follower to immediately start an election.
166             ActorSelection followerActor = context.getPeerActorSelection(followerId);
167             followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
168
169             log.debug("{}: Leader transfer complete", logName());
170
171             leadershipTransferContext.transferCohort.transferComplete();
172             leadershipTransferContext = null;
173         }
174     }
175
176     @Override
177     public void close() {
178         if (leadershipTransferContext != null) {
179             LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
180             leadershipTransferContext = null;
181             localLeadershipTransferContext.transferCohort.abortTransfer();
182         }
183
184         super.close();
185     }
186
187     @VisibleForTesting
188     void markFollowerActive(final String followerId) {
189         getFollower(followerId).markFollowerActive();
190     }
191
192     @VisibleForTesting
193     void markFollowerInActive(final String followerId) {
194         getFollower(followerId).markFollowerInActive();
195     }
196
197     private static class LeadershipTransferContext {
198         RaftActorLeadershipTransferCohort transferCohort;
199         Stopwatch timer = Stopwatch.createStarted();
200
201         LeadershipTransferContext(final RaftActorLeadershipTransferCohort transferCohort) {
202             this.transferCohort = transferCohort;
203         }
204
205         boolean isExpired(final long timeout) {
206             if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
207                 transferCohort.abortTransfer();
208                 return true;
209             }
210
211             return false;
212         }
213     }
214 }