2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft.behaviors;
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import org.opendaylight.controller.cluster.raft.RaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftState;
15 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
16 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
18 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
21 import scala.concurrent.duration.FiniteDuration;
23 import java.util.Random;
24 import java.util.concurrent.TimeUnit;
27 * Abstract class that represents the behavior of a RaftActor
31 * <li> If commitIndex > lastApplied: increment lastApplied, apply
32 * log[lastApplied] to state machine (§5.3)
33 * <li> If RPC request or response contains term T > currentTerm:
34 * set currentTerm = T, convert to follower (§5.1)
36 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
39 * Information about the RaftActor whose behavior this class represents
41 protected final RaftActorContext context;
44 * The maximum election time variance
46 private static final int ELECTION_TIME_MAX_VARIANCE = 100;
49 * The interval in which a new election would get triggered if no leader is found
51 private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
57 private Cancellable electionCancel = null;
60 protected AbstractRaftActorBehavior(RaftActorContext context) {
61 this.context = context;
65 * Derived classes should not directly handle AppendEntries messages it
66 * should let the base class handle it first. Once the base class handles
67 * the AppendEntries message and does the common actions that are applicable
68 * in all RaftState's it will delegate the handling of the AppendEntries
69 * message to the derived class to do more state specific handling by calling
72 * @param sender The actor that sent this message
73 * @param appendEntries The AppendEntries message
74 * @param suggestedState The state that the RaftActor should be in based
75 * on the base class's processing of the AppendEntries
79 protected abstract RaftState handleAppendEntries(ActorRef sender,
80 AppendEntries appendEntries, RaftState suggestedState);
83 * Derived classes should not directly handle AppendEntriesReply messages it
84 * should let the base class handle it first. Once the base class handles
85 * the AppendEntriesReply message and does the common actions that are
86 * applicable in all RaftState's it will delegate the handling of the
87 * AppendEntriesReply message to the derived class to do more state specific
88 * handling by calling this method
90 * @param sender The actor that sent this message
91 * @param appendEntriesReply The AppendEntriesReply message
92 * @param suggestedState The state that the RaftActor should be in based
93 * on the base class's processing of the
94 * AppendEntriesReply message
98 protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
99 AppendEntriesReply appendEntriesReply, RaftState suggestedState);
101 protected RaftState handleRequestVote(ActorRef sender,
102 RequestVote requestVote, RaftState suggestedState){
104 boolean grantVote = false;
106 // Reply false if term < currentTerm (§5.1)
107 if(requestVote.getTerm() < currentTerm()){
110 // If votedFor is null or candidateId, and candidate’s log is at
111 // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
112 } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) {
114 boolean candidateLatest = false;
117 // Raft determines which of two logs is more up-to-date
118 // by comparing the index and term of the last entries in the
119 // logs. If the logs have last entries with different terms, then
120 // the log with the later term is more up-to-date. If the logs
121 // end with the same term, then whichever log is longer is
123 if(requestVote.getLastLogTerm() > lastTerm()){
124 candidateLatest = true;
125 } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){
126 candidateLatest = true;
129 if(candidateLatest) {
131 context.getTermInformation().update(requestVote.getTerm(),
132 requestVote.getCandidateId());
136 sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
138 return suggestedState;
142 * Derived classes should not directly handle RequestVoteReply messages it
143 * should let the base class handle it first. Once the base class handles
144 * the RequestVoteReply message and does the common actions that are
145 * applicable in all RaftState's it will delegate the handling of the
146 * RequestVoteReply message to the derived class to do more state specific
147 * handling by calling this method
149 * @param sender The actor that sent this message
150 * @param requestVoteReply The RequestVoteReply message
151 * @param suggestedState The state that the RaftActor should be in based
152 * on the base class's processing of the RequestVote
157 protected abstract RaftState handleRequestVoteReply(ActorRef sender,
158 RequestVoteReply requestVoteReply, RaftState suggestedState);
161 * @return The derived class should return the state that corresponds to
164 protected abstract RaftState state();
166 protected FiniteDuration electionDuration(){
167 long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
168 return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
171 protected void scheduleElection(FiniteDuration interval) {
173 if (electionCancel != null && !electionCancel.isCancelled()) {
174 electionCancel.cancel();
177 // Schedule an election. When the scheduler triggers an ElectionTimeout
178 // message is sent to itself
180 context.getActorSystem().scheduler().scheduleOnce(interval,
181 context.getActor(), new ElectionTimeout(),
182 context.getActorSystem().dispatcher(), context.getActor());
185 protected long currentTerm(){
186 return context.getTermInformation().getCurrentTerm();
189 protected String votedFor(){
190 return context.getTermInformation().getVotedFor();
193 protected ActorRef actor(){
194 return context.getActor();
197 protected long lastTerm() {
198 return context.getReplicatedLog().last().getTerm();
201 protected long lastIndex() {
202 return context.getReplicatedLog().last().getIndex();
207 public RaftState handleMessage(ActorRef sender, Object message) {
208 RaftState raftState = state();
209 if (message instanceof RaftRPC) {
210 raftState = applyTerm((RaftRPC) message);
212 if (message instanceof AppendEntries) {
213 AppendEntries appendEntries = (AppendEntries) message;
214 if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
215 applyLogToStateMachine(appendEntries.getLeaderCommit());
217 raftState = handleAppendEntries(sender, appendEntries, raftState);
218 } else if (message instanceof AppendEntriesReply) {
220 handleAppendEntriesReply(sender, (AppendEntriesReply) message,
222 } else if (message instanceof RequestVote) {
224 handleRequestVote(sender, (RequestVote) message, raftState);
225 } else if (message instanceof RequestVoteReply) {
227 handleRequestVoteReply(sender, (RequestVoteReply) message,
233 private RaftState applyTerm(RaftRPC rpc) {
234 // If RPC request or response contains term T > currentTerm:
235 // set currentTerm = T, convert to follower (§5.1)
236 // This applies to all RPC messages and responses
237 if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
238 context.getTermInformation().update(rpc.getTerm(), null);
239 return RaftState.Follower;
244 private void applyLogToStateMachine(long index) {
245 // Send a local message to the local RaftActor (it's derived class to be
246 // specific to apply the log to it's index)
247 context.setLastApplied(index);