f5f3f51819be30fdeefb020ebf20100fa7c6cf29
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.TimeUnit;
16 import javax.annotation.Nonnull;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
21 import org.opendaylight.controller.cluster.raft.RaftState;
22 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
23 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
24
25 /**
26  * The behavior of a RaftActor when it is in the Leader state.
27  *
28  * <p>
29  * Leaders:
30  * <ul>
31  * <li> Upon election: send initial empty AppendEntries RPCs
32  * (heartbeat) to each server; repeat during idle periods to
33  * prevent election timeouts (§5.2)
34  * <li> If command received from client: append entry to local log,
35  * respond after entry applied to state machine (§5.3)
36  * <li> If last log index ≥ nextIndex for a follower: send
37  * AppendEntries RPC with log entries starting at nextIndex
38  * <li> If successful: update nextIndex and matchIndex for
39  * follower (§5.3)
40  * <li> If AppendEntries fails because of log inconsistency:
41  * decrement nextIndex and retry (§5.3)
42  * <li> If there exists an N such that N &gt; commitIndex, a majority
43  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
44  * set commitIndex = N (§5.3, §5.4).
45  * </ul>
46  */
47 public class Leader extends AbstractLeader {
48     /**
49      * Internal message sent to periodically check if this leader has become isolated and should transition
50      * to {@link IsolatedLeader}.
51      */
52     @VisibleForTesting
53     static final Object ISOLATED_LEADER_CHECK = new Object();
54
55     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
56     @Nullable private LeadershipTransferContext leadershipTransferContext;
57
58     Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
59         super(context, RaftState.Leader, initializeFromLeader);
60     }
61
62     public Leader(RaftActorContext context) {
63         this(context, null);
64     }
65
66     @Override
67     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
68         Preconditions.checkNotNull(sender, "sender should not be null");
69
70         if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
71             if (isLeaderIsolated()) {
72                 log.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
73                     context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
74                 return internalSwitchBehavior(new IsolatedLeader(context, this));
75             } else {
76                 return this;
77             }
78         } else {
79             return super.handleMessage(sender, originalMessage);
80         }
81     }
82
83     @Override
84     protected void beforeSendHeartbeat() {
85         if (isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS)
86                 > context.getConfigParams().getIsolatedCheckIntervalInMillis()) {
87             context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
88             isolatedLeaderCheck.reset().start();
89         }
90
91         if (leadershipTransferContext != null && leadershipTransferContext.isExpired(
92                 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
93             log.debug("{}: Leadership transfer expired", logName());
94             leadershipTransferContext = null;
95         }
96     }
97
98     @Override
99     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
100         RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
101         tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
102         return returnBehavior;
103     }
104
105     /**
106      * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
107      * <ul>
108      * <li>Start a timer (Stopwatch).</li>
109      * <li>Send an initial AppendEntries heartbeat to all followers.</li>
110      * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
111      * <li>If it matches,
112      *   <ul>
113      *   <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
114      *   <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
115      *   <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
116      *   </ul></li>
117      * <li>Otherwise if the election time out period elapses, notify
118      *     {@link RaftActorLeadershipTransferCohort#abortTransfer}.</li>
119      * </ul>
120      *
121      * @param leadershipTransferCohort the cohort participating in the leadership transfer
122      */
123     public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
124         log.debug("{}: Attempting to transfer leadership", logName());
125
126         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
127
128         // Send an immediate heart beat to the followers.
129         sendAppendEntries(0, false);
130     }
131
132     private void tryToCompleteLeadershipTransfer(String followerId) {
133         if (leadershipTransferContext == null) {
134             return;
135         }
136
137         FollowerLogInformation followerInfo = getFollower(followerId);
138         if (followerInfo == null) {
139             return;
140         }
141
142         long lastIndex = context.getReplicatedLog().lastIndex();
143         boolean isVoting = context.getPeerInfo(followerId).isVoting();
144
145         log.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
146                 logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
147
148         if (isVoting && followerInfo.getMatchIndex() == lastIndex) {
149             log.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
150
151             // We can't be sure if the follower has applied all its log entries to its state so send an
152             // additional AppendEntries with the latest commit index.
153             sendAppendEntries(0, false);
154
155             // Now send a TimeoutNow message to the matching follower to immediately start an election.
156             ActorSelection followerActor = context.getPeerActorSelection(followerId);
157             followerActor.tell(TimeoutNow.INSTANCE, context.getActor());
158
159             log.debug("{}: Leader transfer complete", logName());
160
161             leadershipTransferContext.transferCohort.transferComplete();
162             leadershipTransferContext = null;
163         }
164     }
165
166     @Override
167     public void close() {
168         if (leadershipTransferContext != null) {
169             LeadershipTransferContext localLeadershipTransferContext = leadershipTransferContext;
170             leadershipTransferContext = null;
171             localLeadershipTransferContext.transferCohort.abortTransfer();
172         }
173
174         super.close();
175     }
176
177     @VisibleForTesting
178     void markFollowerActive(String followerId) {
179         getFollower(followerId).markFollowerActive();
180     }
181
182     @VisibleForTesting
183     void markFollowerInActive(String followerId) {
184         getFollower(followerId).markFollowerInActive();
185     }
186
187     private static class LeadershipTransferContext {
188         RaftActorLeadershipTransferCohort transferCohort;
189         Stopwatch timer = Stopwatch.createStarted();
190
191         LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
192             this.transferCohort = transferCohort;
193         }
194
195         boolean isExpired(long timeout) {
196             if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
197                 transferCohort.abortTransfer();
198                 return true;
199             }
200
201             return false;
202         }
203     }
204 }