BUG-5626: Move leaderId/leaderPayloadVersion fields
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.TimeUnit;
16 import javax.annotation.Nonnull;
17 import javax.annotation.Nullable;
18 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
19 import org.opendaylight.controller.cluster.raft.RaftActorContext;
20 import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
21 import org.opendaylight.controller.cluster.raft.RaftState;
22 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
23 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
24
25 /**
26  * The behavior of a RaftActor when it is in the Leader state
27  * <p/>
28  * Leaders:
29  * <ul>
30  * <li> Upon election: send initial empty AppendEntries RPCs
31  * (heartbeat) to each server; repeat during idle periods to
32  * prevent election timeouts (§5.2)
33  * <li> If command received from client: append entry to local log,
34  * respond after entry applied to state machine (§5.3)
35  * <li> If last log index ≥ nextIndex for a follower: send
36  * AppendEntries RPC with log entries starting at nextIndex
37  * <ul>
38  * <li> If successful: update nextIndex and matchIndex for
39  * follower (§5.3)
40  * <li> If AppendEntries fails because of log inconsistency:
41  * decrement nextIndex and retry (§5.3)
42  * </ul>
43  * <li> If there exists an N such that N > commitIndex, a majority
44  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
45  * set commitIndex = N (§5.3, §5.4).
46  */
47 public class Leader extends AbstractLeader {
48     /**
49      * Internal message sent to periodically check if this leader has become isolated and should transition
50      * to {@link IsolatedLeader}.
51      */
52     @VisibleForTesting
53     static final Object ISOLATED_LEADER_CHECK = new Object();
54
55     private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
56     private @Nullable LeadershipTransferContext leadershipTransferContext;
57
58     public Leader(RaftActorContext context) {
59         super(context, RaftState.Leader);
60     }
61
62     @Override
63     public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
64         Preconditions.checkNotNull(sender, "sender should not be null");
65
66         if (ISOLATED_LEADER_CHECK.equals(originalMessage) && isLeaderIsolated()) {
67             LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
68                 context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
69
70             return internalSwitchBehavior(RaftState.IsolatedLeader);
71         }
72
73         return super.handleMessage(sender, originalMessage);
74     }
75
76     @Override
77     protected void beforeSendHeartbeat(){
78         if(isolatedLeaderCheck.elapsed(TimeUnit.MILLISECONDS) > context.getConfigParams().getIsolatedCheckIntervalInMillis()){
79             context.getActor().tell(ISOLATED_LEADER_CHECK, context.getActor());
80             isolatedLeaderCheck.reset().start();
81         }
82
83         if(leadershipTransferContext != null && leadershipTransferContext.isExpired(
84                 context.getConfigParams().getElectionTimeOutInterval().toMillis())) {
85             LOG.debug("{}: Leadership transfer expired", logName());
86             leadershipTransferContext = null;
87         }
88     }
89
90     @Override
91     protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
92         RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
93         tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
94         return returnBehavior;
95     }
96
97     /**
98      * Attempts to transfer leadership to a follower as per the raft paper (§3.10) as follows:
99      * <ul>
100      * <li>Start a timer (Stopwatch).</li>
101      * <li>Send an initial AppendEntries heartbeat to all followers.</li>
102      * <li>On AppendEntriesReply, check if the follower's new match Index matches the leader's last index</li>
103      * <li>If it matches, </li>
104      *   <ul>
105      *   <li>Send an additional AppendEntries to ensure the follower has applied all its log entries to its state.</li>
106      *   <li>Send an ElectionTimeout to the follower to immediately start an election.</li>
107      *   <li>Notify {@link RaftActorLeadershipTransferCohort#transferComplete}.</li>
108      *   </ul>
109      * <li>Otherwise if the election time out period elapses, notify
110      *     {@link RaftActorLeadershipTransferCohort#abortTtransfer}.</li>
111      * </ul>
112      *
113      * @param leadershipTransferCohort
114      */
115     public void transferLeadership(@Nonnull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
116         LOG.debug("{}: Attempting to transfer leadership", logName());
117
118         leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
119
120         // Send an immediate heart beat to the followers.
121         sendAppendEntries(0, false);
122     }
123
124     private void tryToCompleteLeadershipTransfer(String followerId) {
125         if(leadershipTransferContext == null) {
126             return;
127         }
128
129         FollowerLogInformation followerInfo = getFollower(followerId);
130         if(followerInfo == null) {
131             return;
132         }
133
134         long lastIndex = context.getReplicatedLog().lastIndex();
135         boolean isVoting = context.getPeerInfo(followerId).isVoting();
136
137         LOG.debug("{}: tryToCompleteLeadershipTransfer: followerId: {}, matchIndex: {}, lastIndex: {}, isVoting: {}",
138                 logName(), followerId, followerInfo.getMatchIndex(), lastIndex, isVoting);
139
140         if(isVoting && followerInfo.getMatchIndex() == lastIndex) {
141             LOG.debug("{}: Follower's log matches - sending ElectionTimeout", logName());
142
143             // We can't be sure if the follower has applied all its log entries to its state so send an
144             // additional AppendEntries with the latest commit index.
145             sendAppendEntries(0, false);
146
147             // Now send an ElectionTimeout to the matching follower to immediately start an election.
148             ActorSelection followerActor = context.getPeerActorSelection(followerId);
149             followerActor.tell(ElectionTimeout.INSTANCE, context.getActor());
150
151             LOG.debug("{}: Leader transfer complete", logName());
152
153             leadershipTransferContext.transferCohort.transferComplete();
154             leadershipTransferContext = null;
155         }
156     }
157
158     @Override
159     public void close() {
160         if(leadershipTransferContext != null) {
161             leadershipTransferContext.transferCohort.abortTransfer();
162         }
163
164         super.close();
165     }
166
167     @VisibleForTesting
168     void markFollowerActive(String followerId) {
169         getFollower(followerId).markFollowerActive();
170     }
171
172     @VisibleForTesting
173     void markFollowerInActive(String followerId) {
174         getFollower(followerId).markFollowerInActive();
175     }
176
177     private static class LeadershipTransferContext {
178         RaftActorLeadershipTransferCohort transferCohort;
179         Stopwatch timer = Stopwatch.createStarted();
180
181         LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
182             this.transferCohort = transferCohort;
183         }
184
185         boolean isExpired(long timeout) {
186             if(timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
187                 transferCohort.abortTransfer();
188                 return true;
189             }
190
191             return false;
192         }
193     }
194 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.