2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
14 import org.opendaylight.controller.cluster.raft.RaftActorContext;
15 import org.opendaylight.controller.cluster.raft.RaftState;
16 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
17 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
18 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
19 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
21 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
23 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
24 import scala.concurrent.duration.FiniteDuration;
26 import java.util.Random;
27 import java.util.concurrent.TimeUnit;
30 * Abstract class that represents the behavior of a RaftActor
34 * <li> If commitIndex > lastApplied: increment lastApplied, apply
35 * log[lastApplied] to state machine (§5.3)
36 * <li> If RPC request or response contains term T > currentTerm:
37 * set currentTerm = T, convert to follower (§5.1)
39 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
42 * Information about the RaftActor whose behavior this class represents
44 protected final RaftActorContext context;
47 * The maximum election time variance
49 private static final int ELECTION_TIME_MAX_VARIANCE = 100;
52 * The interval at which a heart beat message will be sent to the remote
55 * Since this is set to 100 milliseconds the Election timeout should be
56 * at least 200 milliseconds
58 protected static final FiniteDuration HEART_BEAT_INTERVAL =
59 new FiniteDuration(100, TimeUnit.MILLISECONDS);
62 * The interval in which a new election would get triggered if no leader is found
64 private static final long ELECTION_TIME_INTERVAL =
65 HEART_BEAT_INTERVAL.toMillis() * 2;
70 private Cancellable electionCancel = null;
75 protected String leaderId = null;
78 protected AbstractRaftActorBehavior(RaftActorContext context) {
79 this.context = context;
83 * Derived classes should not directly handle AppendEntries messages it
84 * should let the base class handle it first. Once the base class handles
85 * the AppendEntries message and does the common actions that are applicable
86 * in all RaftState's it will delegate the handling of the AppendEntries
87 * message to the derived class to do more state specific handling by calling
90 * @param sender The actor that sent this message
91 * @param appendEntries The AppendEntries message
92 * @param suggestedState The state that the RaftActor should be in based
93 * on the base class's processing of the AppendEntries
97 protected abstract RaftState handleAppendEntries(ActorRef sender,
98 AppendEntries appendEntries, RaftState suggestedState);
101 protected RaftState appendEntries(ActorRef sender,
102 AppendEntries appendEntries, RaftState raftState) {
104 if (raftState != state()) {
105 context.getLogger().debug("Suggested state is " + raftState
106 + " current behavior state is " + state());
109 // 1. Reply false if term < currentTerm (§5.1)
110 if (appendEntries.getTerm() < currentTerm()) {
111 context.getLogger().debug(
112 "Cannot append entries because sender term " + appendEntries
113 .getTerm() + " is less than " + currentTerm());
115 new AppendEntriesReply(context.getId(), currentTerm(), false,
116 lastIndex(), lastTerm()), actor()
122 return handleAppendEntries(sender, appendEntries, raftState);
126 * Derived classes should not directly handle AppendEntriesReply messages it
127 * should let the base class handle it first. Once the base class handles
128 * the AppendEntriesReply message and does the common actions that are
129 * applicable in all RaftState's it will delegate the handling of the
130 * AppendEntriesReply message to the derived class to do more state specific
131 * handling by calling this method
133 * @param sender The actor that sent this message
134 * @param appendEntriesReply The AppendEntriesReply message
135 * @param suggestedState The state that the RaftActor should be in based
136 * on the base class's processing of the
137 * AppendEntriesReply message
141 protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
142 AppendEntriesReply appendEntriesReply, RaftState suggestedState);
144 protected RaftState requestVote(ActorRef sender,
145 RequestVote requestVote, RaftState suggestedState) {
147 boolean grantVote = false;
149 // Reply false if term < currentTerm (§5.1)
150 if (requestVote.getTerm() < currentTerm()) {
153 // If votedFor is null or candidateId, and candidate’s log is at
154 // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
155 } else if (votedFor() == null || votedFor()
156 .equals(requestVote.getCandidateId())) {
158 boolean candidateLatest = false;
161 // Raft determines which of two logs is more up-to-date
162 // by comparing the index and term of the last entries in the
163 // logs. If the logs have last entries with different terms, then
164 // the log with the later term is more up-to-date. If the logs
165 // end with the same term, then whichever log is longer is
167 if (requestVote.getLastLogTerm() > lastTerm()) {
168 candidateLatest = true;
169 } else if ((requestVote.getLastLogTerm() == lastTerm())
170 && requestVote.getLastLogIndex() >= lastIndex()) {
171 candidateLatest = true;
174 if (candidateLatest) {
176 context.getTermInformation().update(requestVote.getTerm(),
177 requestVote.getCandidateId());
181 sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
183 return suggestedState;
187 * Derived classes should not directly handle RequestVoteReply messages it
188 * should let the base class handle it first. Once the base class handles
189 * the RequestVoteReply message and does the common actions that are
190 * applicable in all RaftState's it will delegate the handling of the
191 * RequestVoteReply message to the derived class to do more state specific
192 * handling by calling this method
194 * @param sender The actor that sent this message
195 * @param requestVoteReply The RequestVoteReply message
196 * @param suggestedState The state that the RaftActor should be in based
197 * on the base class's processing of the RequestVote
202 protected abstract RaftState handleRequestVoteReply(ActorRef sender,
203 RequestVoteReply requestVoteReply, RaftState suggestedState);
205 protected FiniteDuration electionDuration() {
206 long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
207 return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
208 TimeUnit.MILLISECONDS);
211 protected void stopElection() {
212 if (electionCancel != null && !electionCancel.isCancelled()) {
213 electionCancel.cancel();
217 protected void scheduleElection(FiniteDuration interval) {
221 // Schedule an election. When the scheduler triggers an ElectionTimeout
222 // message is sent to itself
224 context.getActorSystem().scheduler().scheduleOnce(interval,
225 context.getActor(), new ElectionTimeout(),
226 context.getActorSystem().dispatcher(), context.getActor());
229 protected long currentTerm() {
230 return context.getTermInformation().getCurrentTerm();
233 protected String votedFor() {
234 return context.getTermInformation().getVotedFor();
237 protected ActorRef actor() {
238 return context.getActor();
241 protected long lastTerm() {
242 return context.getReplicatedLog().lastTerm();
245 protected long lastIndex() {
246 return context.getReplicatedLog().lastIndex();
249 protected ClientRequestTracker findClientRequestTracker(long logIndex) {
253 protected void applyLogToStateMachine(long index) {
254 // Now maybe we apply to the state machine
255 for (long i = context.getLastApplied() + 1;
256 i < index + 1; i++) {
257 ActorRef clientActor = null;
258 String identifier = null;
259 ClientRequestTracker tracker = findClientRequestTracker(i);
261 if (tracker != null) {
262 clientActor = tracker.getClientActor();
263 identifier = tracker.getIdentifier();
265 ReplicatedLogEntry replicatedLogEntry =
266 context.getReplicatedLog().get(i);
268 if (replicatedLogEntry != null) {
269 actor().tell(new ApplyState(clientActor, identifier,
270 replicatedLogEntry), actor());
272 context.getLogger().error(
273 "Missing index " + i + " from log. Cannot apply state.");
276 // Send a local message to the local RaftActor (it's derived class to be
277 // specific to apply the log to it's index)
278 context.setLastApplied(index);
282 public RaftState handleMessage(ActorRef sender, Object message) {
283 RaftState raftState = state();
284 if (message instanceof RaftRPC) {
285 raftState = applyTerm((RaftRPC) message);
287 if (message instanceof AppendEntries) {
288 raftState = appendEntries(sender, (AppendEntries) message,
290 } else if (message instanceof AppendEntriesReply) {
292 handleAppendEntriesReply(sender, (AppendEntriesReply) message,
294 } else if (message instanceof RequestVote) {
296 requestVote(sender, (RequestVote) message, raftState);
297 } else if (message instanceof RequestVoteReply) {
299 handleRequestVoteReply(sender, (RequestVoteReply) message,
305 @Override public String getLeaderId() {
309 private RaftState applyTerm(RaftRPC rpc) {
310 // If RPC request or response contains term T > currentTerm:
311 // set currentTerm = T, convert to follower (§5.1)
312 // This applies to all RPC messages and responses
313 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
314 context.getTermInformation().update(rpc.getTerm(), null);
315 return RaftState.Follower;