167082711d82b46c61e90e8311473d051e06461c
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehavior.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import org.opendaylight.controller.cluster.raft.RaftActorContext;
14 import org.opendaylight.controller.cluster.raft.RaftState;
15 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
16 import org.opendaylight.controller.cluster.raft.internal.messages.ElectionTimeout;
17 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
18 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
19 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
20 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
21 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
22 import scala.concurrent.duration.FiniteDuration;
23
24 import java.util.Random;
25 import java.util.concurrent.TimeUnit;
26
27 /**
28  * Abstract class that represents the behavior of a RaftActor
29  * <p/>
30  * All Servers:
31  * <ul>
32  * <li> If commitIndex > lastApplied: increment lastApplied, apply
33  * log[lastApplied] to state machine (§5.3)
34  * <li> If RPC request or response contains term T > currentTerm:
35  * set currentTerm = T, convert to follower (§5.1)
36  */
37 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
38
39     /**
40      * Information about the RaftActor whose behavior this class represents
41      */
42     protected final RaftActorContext context;
43
44     /**
45      * The maximum election time variance
46      */
47     private static final int ELECTION_TIME_MAX_VARIANCE = 100;
48
49     /**
50      * The interval at which a heart beat message will be sent to the remote
51      * RaftActor
52      * <p/>
53      * Since this is set to 100 milliseconds the Election timeout should be
54      * at least 200 milliseconds
55      */
56     protected static final FiniteDuration HEART_BEAT_INTERVAL =
57         new FiniteDuration(100, TimeUnit.MILLISECONDS);
58
59     /**
60      * The interval in which a new election would get triggered if no leader is found
61      */
62     private static final long ELECTION_TIME_INTERVAL =
63         HEART_BEAT_INTERVAL.toMillis() * 2;
64
65     /**
66      *
67      */
68
69     private Cancellable electionCancel = null;
70
71
72     protected AbstractRaftActorBehavior(RaftActorContext context) {
73         this.context = context;
74     }
75
76     /**
77      * Derived classes should not directly handle AppendEntries messages it
78      * should let the base class handle it first. Once the base class handles
79      * the AppendEntries message and does the common actions that are applicable
80      * in all RaftState's it will delegate the handling of the AppendEntries
81      * message to the derived class to do more state specific handling by calling
82      * this method
83      *
84      * @param sender         The actor that sent this message
85      * @param appendEntries  The AppendEntries message
86      * @param suggestedState The state that the RaftActor should be in based
87      *                       on the base class's processing of the AppendEntries
88      *                       message
89      * @return
90      */
91     protected abstract RaftState handleAppendEntries(ActorRef sender,
92         AppendEntries appendEntries, RaftState suggestedState);
93
94
95     protected RaftState appendEntries(ActorRef sender,
96         AppendEntries appendEntries, RaftState raftState){
97
98         // 1. Reply false if term < currentTerm (§5.1)
99         if(appendEntries.getTerm() < currentTerm()){
100             sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
101             return state();
102         }
103
104         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
105         // whose term matches prevLogTerm (§5.3)
106         ReplicatedLogEntry previousEntry = context.getReplicatedLog()
107             .get(appendEntries.getPrevLogIndex());
108
109         if(previousEntry == null || previousEntry.getTerm() != appendEntries.getPrevLogTerm()){
110             sender.tell(new AppendEntriesReply(currentTerm(), false), actor());
111             return state();
112         }
113
114         if(appendEntries.getEntries() != null) {
115             // 3. If an existing entry conflicts with a new one (same index
116             // but different terms), delete the existing entry and all that
117             // follow it (§5.3)
118             int addEntriesFrom = 0;
119             for (int i = 0;
120                  i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
121                 ReplicatedLogEntry newEntry = context.getReplicatedLog()
122                     .get(i + 1);
123
124                 if (newEntry != null && newEntry.getTerm() == appendEntries.getEntries().get(i).getTerm()){
125                     break;
126                 }
127                 if (newEntry != null && newEntry.getTerm() != appendEntries
128                     .getEntries().get(i).getTerm()) {
129                     context.getReplicatedLog().removeFrom(i + 1);
130                     break;
131                 }
132             }
133
134             // 4. Append any new entries not already in the log
135             for (int i = addEntriesFrom;
136                  i < appendEntries.getEntries().size(); i++) {
137                 context.getReplicatedLog()
138                     .append(appendEntries.getEntries().get(i));
139             }
140         }
141
142
143         // 5. If leaderCommit > commitIndex, set commitIndex =
144         // min(leaderCommit, index of last new entry)
145         context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(),
146             context.getReplicatedLog().last().getIndex()));
147
148         // If commitIndex > lastApplied: increment lastApplied, apply
149         // log[lastApplied] to state machine (§5.3)
150         if (appendEntries.getLeaderCommit() > context.getLastApplied()) {
151             applyLogToStateMachine(appendEntries.getLeaderCommit());
152         }
153
154         sender.tell(new AppendEntriesReply(currentTerm(), true), actor());
155
156         return handleAppendEntries(sender, appendEntries, raftState);
157     }
158
159     /**
160      * Derived classes should not directly handle AppendEntriesReply messages it
161      * should let the base class handle it first. Once the base class handles
162      * the AppendEntriesReply message and does the common actions that are
163      * applicable in all RaftState's it will delegate the handling of the
164      * AppendEntriesReply message to the derived class to do more state specific
165      * handling by calling this method
166      *
167      * @param sender             The actor that sent this message
168      * @param appendEntriesReply The AppendEntriesReply message
169      * @param suggestedState     The state that the RaftActor should be in based
170      *                           on the base class's processing of the
171      *                           AppendEntriesReply message
172      * @return
173      */
174
175     protected abstract RaftState handleAppendEntriesReply(ActorRef sender,
176         AppendEntriesReply appendEntriesReply, RaftState suggestedState);
177
178     protected RaftState requestVote(ActorRef sender,
179         RequestVote requestVote, RaftState suggestedState) {
180
181         boolean grantVote = false;
182
183         //  Reply false if term < currentTerm (§5.1)
184         if (requestVote.getTerm() < currentTerm()) {
185             grantVote = false;
186
187             // If votedFor is null or candidateId, and candidate’s log is at
188             // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
189         } else if (votedFor() == null || votedFor()
190             .equals(requestVote.getCandidateId())) {
191
192             boolean candidateLatest = false;
193
194             // From §5.4.1
195             // Raft determines which of two logs is more up-to-date
196             // by comparing the index and term of the last entries in the
197             // logs. If the logs have last entries with different terms, then
198             // the log with the later term is more up-to-date. If the logs
199             // end with the same term, then whichever log is longer is
200             // more up-to-date.
201             if (requestVote.getLastLogTerm() > lastTerm()) {
202                 candidateLatest = true;
203             } else if ((requestVote.getLastLogTerm() == lastTerm())
204                 && requestVote.getLastLogIndex() >= lastTerm()) {
205                 candidateLatest = true;
206             }
207
208             if (candidateLatest) {
209                 grantVote = true;
210                 context.getTermInformation().update(requestVote.getTerm(),
211                     requestVote.getCandidateId());
212             }
213         }
214
215         sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
216
217         return suggestedState;
218     }
219
220     /**
221      * Derived classes should not directly handle RequestVoteReply messages it
222      * should let the base class handle it first. Once the base class handles
223      * the RequestVoteReply message and does the common actions that are
224      * applicable in all RaftState's it will delegate the handling of the
225      * RequestVoteReply message to the derived class to do more state specific
226      * handling by calling this method
227      *
228      * @param sender           The actor that sent this message
229      * @param requestVoteReply The RequestVoteReply message
230      * @param suggestedState   The state that the RaftActor should be in based
231      *                         on the base class's processing of the RequestVote
232      *                         message
233      * @return
234      */
235
236     protected abstract RaftState handleRequestVoteReply(ActorRef sender,
237         RequestVoteReply requestVoteReply, RaftState suggestedState);
238
239     /**
240      * @return The derived class should return the state that corresponds to
241      * it's behavior
242      */
243     protected abstract RaftState state();
244
245     protected FiniteDuration electionDuration() {
246         long variance = new Random().nextInt(ELECTION_TIME_MAX_VARIANCE);
247         return new FiniteDuration(ELECTION_TIME_INTERVAL + variance,
248             TimeUnit.MILLISECONDS);
249     }
250
251     protected void scheduleElection(FiniteDuration interval) {
252
253         if (electionCancel != null && !electionCancel.isCancelled()) {
254             electionCancel.cancel();
255         }
256
257         // Schedule an election. When the scheduler triggers an ElectionTimeout
258         // message is sent to itself
259         electionCancel =
260             context.getActorSystem().scheduler().scheduleOnce(interval,
261                 context.getActor(), new ElectionTimeout(),
262                 context.getActorSystem().dispatcher(), context.getActor());
263     }
264
265     protected long currentTerm() {
266         return context.getTermInformation().getCurrentTerm();
267     }
268
269     protected String votedFor() {
270         return context.getTermInformation().getVotedFor();
271     }
272
273     protected ActorRef actor() {
274         return context.getActor();
275     }
276
277     protected long lastTerm() {
278         return context.getReplicatedLog().last().getTerm();
279     }
280
281     protected long lastIndex() {
282         return context.getReplicatedLog().last().getIndex();
283     }
284
285
286     @Override
287     public RaftState handleMessage(ActorRef sender, Object message) {
288         RaftState raftState = state();
289         if (message instanceof RaftRPC) {
290             raftState = applyTerm((RaftRPC) message);
291         }
292         if (message instanceof AppendEntries) {
293             raftState = appendEntries(sender, (AppendEntries) message,
294                 raftState);
295         } else if (message instanceof AppendEntriesReply) {
296             raftState =
297                 handleAppendEntriesReply(sender, (AppendEntriesReply) message,
298                     raftState);
299         } else if (message instanceof RequestVote) {
300             raftState =
301                 requestVote(sender, (RequestVote) message, raftState);
302         } else if (message instanceof RequestVoteReply) {
303             raftState =
304                 handleRequestVoteReply(sender, (RequestVoteReply) message,
305                     raftState);
306         }
307         return raftState;
308     }
309
310     private RaftState applyTerm(RaftRPC rpc) {
311         // If RPC request or response contains term T > currentTerm:
312         // set currentTerm = T, convert to follower (§5.1)
313         // This applies to all RPC messages and responses
314         if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
315             context.getTermInformation().update(rpc.getTerm(), null);
316             return RaftState.Follower;
317         }
318         return state();
319     }
320
321     private void applyLogToStateMachine(long index) {
322         // Send a local message to the local RaftActor (it's derived class to be
323         // specific to apply the log to it's index)
324         context.setLastApplied(index);
325     }
326
327
328 }