2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import org.opendaylight.controller.cluster.raft.RaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftState;
15 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
16 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
18 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
19 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
21 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
22 import scala.concurrent.duration.FiniteDuration;
24 import java.util.Random;
25 import java.util.concurrent.TimeUnit;
28 * Abstract class that represents the behavior of a RaftActor
32 * <li> If commitIndex > lastApplied: increment lastApplied, apply
33 * log[lastApplied] to state machine (§5.3)
34 * <li> If RPC request or response contains term T > currentTerm:
35 * set currentTerm = T, convert to follower (§5.1)
37 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
40 * Information about the RaftActor whose behavior this class represents
42 protected final RaftActorContext context;
45 * The maximum election time variance
47 private static final int ELECTION_TIME_MAX_VARIANCE = 100;
50 * The interval at which a heart beat message will be sent to the remote
53 * Since this is set to 100 milliseconds the Election timeout should be
54 * at least 200 milliseconds
56 protected static final FiniteDuration HEART_BEAT_INTERVAL =
57 new FiniteDuration(100, TimeUnit.MILLISECONDS);
60 * The interval in which a new election would get triggered if no leader is found
62 private static final long ELECTION_TIME_INTERVAL =
63 HEART_BEAT_INTERVAL.toMillis() * 2;
69 private Cancellable electionCancel = null;
72 protected AbstractRaftActorBehavior(RaftActorContext context) {
73 this.context = context;
77 * Derived classes should not directly handle AppendEntries messages it
78 * should let the base class handle it first. Once the base class handles
79 * the AppendEntries message and does the common actions that are applicable
80 * in all RaftState's it will delegate the handling of the AppendEntries
81 * message to the derived class to do more state specific handling by calling
84 * @param sender The actor that sent this message
85 * @param appendEntries The AppendEntries message
86 * @param suggestedState The state that the RaftActor should be in based
87 * on the base class's processing of the AppendEntries
91 protected abstract RaftState handleAppendEntries(ActorRef sender,
92 AppendEntries appendEntries, RaftState suggestedState);
95 protected RaftState appendEntries(ActorRef sender,
96 AppendEntries appendEntries, RaftState raftState){
98 // 1. Reply false if term < currentTerm (§5.1)
99 if(appendEntries.getTerm() < currentTerm()){
100 sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
104 // 2. Reply false if log doesn’t contain an entry at prevLogIndex
105 // whose term matches prevLogTerm (§5.3)
106 ReplicatedLogEntry previousEntry = context.getReplicatedLog()
107 .get(appendEntries.getPrevLogIndex());
109 if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){
110 sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
114 if(appendEntries.getEntries() != null) {
115 // 3. If an existing entry conflicts with a new one (same index
116 // but different terms), delete the existing entry and all that
118 int addEntriesFrom = 0;
120 i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
121 ReplicatedLogEntry newEntry = context.getReplicatedLog()
124 if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){
127 if (newEntry != null && newEntry.getTerm() != appendEntries
128 .getEntries().get(i).getTerm()) {
129 context.getReplicatedLog().removeFrom(i + 1);
134 // 4. Append any new entries not already in the log
135 for (int i = addEntriesFrom;
136 i < appendEntries.getEntries().size(); i++) {
137 context.getReplicatedLog()
138 .append(appendEntries.getEntries().get(i));
143 // 5. If leaderCommit > commitIndex, set commitIndex =
144 // min(leaderCommit, index of last new entry)
145 context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
146 context.getReplicatedLog().last().getIndex()));
148 // If commitIndex > lastApplied: increment lastApplied, apply
149 // log[lastApplied] to state machine (§5.3)
150 if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
151 applyLogToStateMachine(appendEntries.getLeaderCommit());
154 sender.tell(new AppendEntriesReply(currentTerm(), true), actor());
156 return handleAppendEntries(sender, appendEntries, raftState);
160 * Derived classes should not directly handle AppendEntriesReply messages it
161 * should let the base class handle it first. Once the base class handles
162 * the AppendEntriesReply message and does the common actions that are
163 * applicable in all RaftState's it will delegate the handling of the
164 * AppendEntriesReply message to the derived class to do more state specific
165 * handling by calling this method
167 * @param sender The actor that sent this message
168 * @param appendEntriesReply The AppendEntriesReply message
169 * @param suggestedState The state that the RaftActor should be in based
170 * on the base class's processing of the
171 * AppendEntriesReply message
175 protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
176 AppendEntriesReply appendEntriesReply, RaftState suggestedState);
178 protected RaftState requestVote(ActorRef sender,
179 RequestVote requestVote, RaftState suggestedState) {
181 boolean grantVote = false;
183 // Reply false if term < currentTerm (§5.1)
184 if (requestVote.getTerm() < currentTerm()) {
187 // If votedFor is null or candidateId, and candidate’s log is at
188 // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
189 } else if (votedFor() == null || votedFor()
190 .equals(requestVote.getCandidateId())) {
192 boolean candidateLatest = false;
195 // Raft determines which of two logs is more up-to-date
196 // by comparing the index and term of the last entries in the
197 // logs. If the logs have last entries with different terms, then
198 // the log with the later term is more up-to-date. If the logs
199 // end with the same term, then whichever log is longer is
201 if (requestVote.getLastLogTerm() > lastTerm()) {
202 candidateLatest = true;
203 } else if ((requestVote.getLastLogTerm() == lastTerm())
204 && requestVote.getLastLogIndex() >= lastTerm()) {
205 candidateLatest = true;
208 if (candidateLatest) {
210 context.getTermInformation().update(requestVote.getTerm(),
211 requestVote.getCandidateId());
215 sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
217 return suggestedState;
221 * Derived classes should not directly handle RequestVoteReply messages it
222 * should let the base class handle it first. Once the base class handles
223 * the RequestVoteReply message and does the common actions that are
224 * applicable in all RaftState's it will delegate the handling of the
225 * RequestVoteReply message to the derived class to do more state specific
226 * handling by calling this method
228 * @param sender The actor that sent this message
229 * @param requestVoteReply The RequestVoteReply message
230 * @param suggestedState The state that the RaftActor should be in based
231 * on the base class's processing of the RequestVote
236 protected abstract RaftState handleRequestVoteReply(ActorRef sender,
237 RequestVoteReply requestVoteReply, RaftState suggestedState);
240 * @return The derived class should return the state that corresponds to
243 protected abstract RaftState state();
245 protected FiniteDuration electionDuration() {
246 long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
247 return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
248 TimeUnit.MILLISECONDS);
251 protected void scheduleElection(FiniteDuration interval) {
253 if (electionCancel != null && !electionCancel.isCancelled()) {
254 electionCancel.cancel();
257 // Schedule an election. When the scheduler triggers an ElectionTimeout
258 // message is sent to itself
260 context.getActorSystem().scheduler().scheduleOnce(interval,
261 context.getActor(), new ElectionTimeout(),
262 context.getActorSystem().dispatcher(), context.getActor());
265 protected long currentTerm() {
266 return context.getTermInformation().getCurrentTerm();
269 protected String votedFor() {
270 return context.getTermInformation().getVotedFor();
273 protected ActorRef actor() {
274 return context.getActor();
277 protected long lastTerm() {
278 return context.getReplicatedLog().last().getTerm();
281 protected long lastIndex() {
282 return context.getReplicatedLog().last().getIndex();
287 public RaftState handleMessage(ActorRef sender, Object message) {
288 RaftState raftState = state();
289 if (message instanceof RaftRPC) {
290 raftState = applyTerm((RaftRPC) message);
292 if (message instanceof AppendEntries) {
293 raftState = appendEntries(sender, (AppendEntries) message,
295 } else if (message instanceof AppendEntriesReply) {
297 handleAppendEntriesReply(sender, (AppendEntriesReply) message,
299 } else if (message instanceof RequestVote) {
301 requestVote(sender, (RequestVote) message, raftState);
302 } else if (message instanceof RequestVoteReply) {
304 handleRequestVoteReply(sender, (RequestVoteReply) message,
310 private RaftState applyTerm(RaftRPC rpc) {
311 // If RPC request or response contains term T > currentTerm:
312 // set currentTerm = T, convert to follower (§5.1)
313 // This applies to all RPC messages and responses
314 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
315 context.getTermInformation().update(rpc.getTerm(), null);
316 return RaftState.Follower;
321 private void applyLogToStateMachine(long index) {
322 // Send a local message to the local RaftActor (it's derived class to be
323 // specific to apply the log to it's index)
324 context.setLastApplied(index);