Properly handle RequestVote in all states
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehavior.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import org.opendaylight.controller.cluster.raft.RaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftState;
15 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
16 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
18 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
19 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
21 import scala.concurrent.duration.FiniteDuration;
22
23 import java.util.Random;
24 import java.util.concurrent.TimeUnit;
25
26 /**
27  * Abstract class that represents the behavior of a RaftActor
28  * <p/>
29  * All Servers:
30  * <ul>
31  * <li> If commitIndex > lastApplied: increment lastApplied, apply
32  * log[lastApplied] to state machine (§5.3)
33  * <li> If RPC request or response contains term T > currentTerm:
34  * set currentTerm = T, convert to follower (§5.1)
35  */
36 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
37
38     /**
39      * Information about the RaftActor whose behavior this class represents
40      */
41     protected final RaftActorContext context;
42
43     /**
44      * The maximum election time variance
45      */
46     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
47
48     /**
49      * The interval in which a new election would get triggered if no leader is found
50      */
51     private static final long ELECTION_TIME_INTERVAL = Leader.HEART_BEAT_INTERVAL.toMillis() * 2;
52
53     /**
54      *
55      */
56
57     private Cancellable electionCancel = null;
58
59
60     protected AbstractRaftActorBehavior(RaftActorContext context) {
61         this.context = context;
62     }
63
64     /**
65      * Derived classes should not directly handle AppendEntries messages it
66      * should let the base class handle it first. Once the base class handles
67      * the AppendEntries message and does the common actions that are applicable
68      * in all RaftState's it will delegate the handling of the AppendEntries
69      * message to the derived class to do more state specific handling by calling
70      * this method
71      *
72      * @param sender         The actor that sent this message
73      * @param appendEntries  The AppendEntries message
74      * @param suggestedState The state that the RaftActor should be in based
75      *                       on the base class's processing of the AppendEntries
76      *                       message
77      * @return
78      */
79     protected abstract RaftState handleAppendEntries(ActorRef sender,
80         AppendEntries appendEntries, RaftState suggestedState);
81
82     /**
83      * Derived classes should not directly handle AppendEntriesReply messages it
84      * should let the base class handle it first. Once the base class handles
85      * the AppendEntriesReply message and does the common actions that are
86      * applicable in all RaftState's it will delegate the handling of the
87      * AppendEntriesReply message to the derived class to do more state specific
88      * handling by calling this method
89      *
90      * @param sender             The actor that sent this message
91      * @param appendEntriesReply The AppendEntriesReply message
92      * @param suggestedState     The state that the RaftActor should be in based
93      *                           on the base class's processing of the
94      *                           AppendEntriesReply message
95      * @return
96      */
97
98     protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
99         AppendEntriesReply appendEntriesReply, RaftState suggestedState);
100
101     protected RaftState handleRequestVote(ActorRef sender,
102         RequestVote requestVote, RaftState suggestedState){
103
104         boolean grantVote = false;
105
106         //  Reply false if term < currentTerm (§5.1)
107         if(requestVote.getTerm() < currentTerm()){
108             grantVote = false;
109
110         // If votedFor is null or candidateId, and candidate’s log is at
111         // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
112         } else if (votedFor() == null || votedFor().equals(requestVote.getCandidateId())) {
113
114             boolean candidateLatest = false;
115
116             // From §5.4.1
117             // Raft determines which of two logs is more up-to-date
118             // by comparing the index and term of the last entries in the
119             // logs. If the logs have last entries with different terms, then
120             // the log with the later term is more up-to-date. If the logs
121             // end with the same term, then whichever log is longer is
122             // more up-to-date.
123             if(requestVote.getLastLogTerm() > lastTerm()){
124                 candidateLatest = true;
125             } else if((requestVote.getLastLogTerm() == lastTerm()) && requestVote.getLastLogIndex() >= lastTerm()){
126                 candidateLatest = true;
127             }
128
129             if(candidateLatest) {
130                 grantVote = true;
131                 context.getTermInformation().update(requestVote.getTerm(),
132                     requestVote.getCandidateId());
133             }
134         }
135
136         sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
137
138         return suggestedState;
139     }
140
141     /**
142      * Derived classes should not directly handle RequestVoteReply messages it
143      * should let the base class handle it first. Once the base class handles
144      * the RequestVoteReply message and does the common actions that are
145      * applicable in all RaftState's it will delegate the handling of the
146      * RequestVoteReply message to the derived class to do more state specific
147      * handling by calling this method
148      *
149      * @param sender           The actor that sent this message
150      * @param requestVoteReply The RequestVoteReply message
151      * @param suggestedState   The state that the RaftActor should be in based
152      *                         on the base class's processing of the RequestVote
153      *                         message
154      * @return
155      */
156
157     protected abstract RaftState handleRequestVoteReply(ActorRef sender,
158         RequestVoteReply requestVoteReply, RaftState suggestedState);
159
160     /**
161      * @return The derived class should return the state that corresponds to
162      * it's behavior
163      */
164     protected abstract RaftState state();
165
166     protected FiniteDuration electionDuration(){
167         long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
168         return new FiniteDuration(ELECTION_TIME_INTERVAL + variance, TimeUnit.MILLISECONDS);
169     }
170
171     protected void scheduleElection(FiniteDuration interval) {
172
173         if (electionCancel != null && !electionCancel.isCancelled()) {
174             electionCancel.cancel();
175         }
176
177         // Schedule an election. When the scheduler triggers an ElectionTimeout
178         // message is sent to itself
179         electionCancel =
180             context.getActorSystem().scheduler().scheduleOnce(interval,
181                 context.getActor(), new ElectionTimeout(),
182                 context.getActorSystem().dispatcher(), context.getActor());
183     }
184
185     protected long currentTerm(){
186         return context.getTermInformation().getCurrentTerm();
187     }
188
189     protected String votedFor(){
190         return context.getTermInformation().getVotedFor();
191     }
192
193     protected ActorRef actor(){
194         return context.getActor();
195     }
196
197     protected long lastTerm() {
198         return context.getReplicatedLog().last().getTerm();
199     }
200
201     protected long lastIndex() {
202         return context.getReplicatedLog().last().getIndex();
203     }
204
205
206     @Override
207     public RaftState handleMessage(ActorRef sender, Object message) {
208         RaftState raftState = state();
209         if (message instanceof RaftRPC) {
210             raftState = applyTerm((RaftRPC) message);
211         }
212         if (message instanceof AppendEntries) {
213             AppendEntries appendEntries = (AppendEntries) message;
214             if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
215                 applyLogToStateMachine(appendEntries.getLeaderCommit());
216             }
217             raftState = handleAppendEntries(sender, appendEntries, raftState);
218         } else if (message instanceof AppendEntriesReply) {
219             raftState =
220                 handleAppendEntriesReply(sender, (AppendEntriesReply) message,
221                     raftState);
222         } else if (message instanceof RequestVote) {
223             raftState =
224                 handleRequestVote(sender, (RequestVote) message, raftState);
225         } else if (message instanceof RequestVoteReply) {
226             raftState =
227                 handleRequestVoteReply(sender, (RequestVoteReply) message,
228                     raftState);
229         }
230         return raftState;
231     }
232
233     private RaftState applyTerm(RaftRPC rpc) {
234         // If RPC request or response contains term T > currentTerm:
235         // set currentTerm = T, convert to follower (§5.1)
236         // This applies to all RPC messages and responses
237         if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
238             context.getTermInformation().update(rpc.getTerm(), null);
239             return RaftState.Follower;
240         }
241         return state();
242     }
243
244     private void applyLogToStateMachine(long index) {
245         // Send a local message to the local RaftActor (it's derived class to be
246         // specific to apply the log to it's index)
247         context.setLastApplied(index);
248     }
249
250
251 }