eed74bba82fbf8a4f8e4c1b5431887451c49d59d
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehavior.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import akka.event.LoggingAdapter;
14 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
15 import org.opendaylight.controller.cluster.raft.RaftActorContext;
16 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
17 import org.opendaylight.controller.cluster.raft.SerializationUtils;
18 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
19 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
20 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
22 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
23 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
24 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
25 import scala.concurrent.duration.FiniteDuration;
26
27 import java.util.Random;
28 import java.util.concurrent.TimeUnit;
29
30 /**
31  * Abstract class that represents the behavior of a RaftActor
32  * <p/>
33  * All Servers:
34  * <ul>
35  * <li> If commitIndex > lastApplied: increment lastApplied, apply
36  * log[lastApplied] to state machine (§5.3)
37  * <li> If RPC request or response contains term T > currentTerm:
38  * set currentTerm = T, convert to follower (§5.1)
39  */
40 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
41
42     /**
43      * Information about the RaftActor whose behavior this class represents
44      */
45     protected final RaftActorContext context;
46
47     /**
48      *
49      */
50     protected final LoggingAdapter LOG;
51
52     /**
53      *
54      */
55     private Cancellable electionCancel = null;
56
57     /**
58      *
59      */
60     protected String leaderId = null;
61
62
63     protected AbstractRaftActorBehavior(RaftActorContext context) {
64         this.context = context;
65         this.LOG = context.getLogger();
66     }
67
68     /**
69      * Derived classes should not directly handle AppendEntries messages it
70      * should let the base class handle it first. Once the base class handles
71      * the AppendEntries message and does the common actions that are applicable
72      * in all RaftState's it will delegate the handling of the AppendEntries
73      * message to the derived class to do more state specific handling by calling
74      * this method
75      *
76      * @param sender         The actor that sent this message
77      * @param appendEntries  The AppendEntries message
78      * @return
79      */
80     protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
81         AppendEntries appendEntries);
82
83
84     /**
85      * appendEntries first processes the AppendEntries message and then
86      * delegates handling to a specific behavior
87      *
88      * @param sender
89      * @param appendEntries
90      * @return
91      */
92     protected RaftActorBehavior appendEntries(ActorRef sender,
93         AppendEntries appendEntries) {
94
95         // 1. Reply false if term < currentTerm (§5.1)
96         if (appendEntries.getTerm() < currentTerm()) {
97             if(LOG.isDebugEnabled()) {
98                 LOG.debug("Cannot append entries because sender term {} is less than {}",
99                         appendEntries.getTerm(), currentTerm());
100             }
101
102             sender.tell(
103                 new AppendEntriesReply(context.getId(), currentTerm(), false,
104                     lastIndex(), lastTerm()), actor()
105             );
106             return this;
107         }
108
109
110         return handleAppendEntries(sender, appendEntries);
111     }
112
113     /**
114      * Derived classes should not directly handle AppendEntriesReply messages it
115      * should let the base class handle it first. Once the base class handles
116      * the AppendEntriesReply message and does the common actions that are
117      * applicable in all RaftState's it will delegate the handling of the
118      * AppendEntriesReply message to the derived class to do more state specific
119      * handling by calling this method
120      *
121      * @param sender             The actor that sent this message
122      * @param appendEntriesReply The AppendEntriesReply message
123      * @return
124      */
125     protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
126         AppendEntriesReply appendEntriesReply);
127
128     /**
129      * requestVote handles the RequestVote message. This logic is common
130      * for all behaviors
131      *
132      * @param sender
133      * @param requestVote
134      * @return
135      */
136     protected RaftActorBehavior requestVote(ActorRef sender,
137         RequestVote requestVote) {
138
139         if(LOG.isDebugEnabled()) {
140             LOG.debug(requestVote.toString());
141         }
142
143         boolean grantVote = false;
144
145         //  Reply false if term < currentTerm (§5.1)
146         if (requestVote.getTerm() < currentTerm()) {
147             grantVote = false;
148
149             // If votedFor is null or candidateId, and candidate’s log is at
150             // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
151         } else if (votedFor() == null || votedFor()
152             .equals(requestVote.getCandidateId())) {
153
154             boolean candidateLatest = false;
155
156             // From §5.4.1
157             // Raft determines which of two logs is more up-to-date
158             // by comparing the index and term of the last entries in the
159             // logs. If the logs have last entries with different terms, then
160             // the log with the later term is more up-to-date. If the logs
161             // end with the same term, then whichever log is longer is
162             // more up-to-date.
163             if (requestVote.getLastLogTerm() > lastTerm()) {
164                 candidateLatest = true;
165             } else if ((requestVote.getLastLogTerm() == lastTerm())
166                 && requestVote.getLastLogIndex() >= lastIndex()) {
167                 candidateLatest = true;
168             }
169
170             if (candidateLatest) {
171                 grantVote = true;
172                 context.getTermInformation().updateAndPersist(requestVote.getTerm(),
173                     requestVote.getCandidateId());
174             }
175         }
176
177         sender.tell(new RequestVoteReply(currentTerm(), grantVote), actor());
178
179         return this;
180     }
181
182     /**
183      * Derived classes should not directly handle RequestVoteReply messages it
184      * should let the base class handle it first. Once the base class handles
185      * the RequestVoteReply message and does the common actions that are
186      * applicable in all RaftState's it will delegate the handling of the
187      * RequestVoteReply message to the derived class to do more state specific
188      * handling by calling this method
189      *
190      * @param sender           The actor that sent this message
191      * @param requestVoteReply The RequestVoteReply message
192      * @return
193      */
194     protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender,
195         RequestVoteReply requestVoteReply);
196
197     /**
198      * Creates a random election duration
199      *
200      * @return
201      */
202     protected FiniteDuration electionDuration() {
203         long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance());
204         return context.getConfigParams().getElectionTimeOutInterval().$plus(
205             new FiniteDuration(variance, TimeUnit.MILLISECONDS));
206     }
207
208     /**
209      * stop the scheduled election
210      */
211     protected void stopElection() {
212         if (electionCancel != null && !electionCancel.isCancelled()) {
213             electionCancel.cancel();
214         }
215     }
216
217     /**
218      * schedule a new election
219      *
220      * @param interval
221      */
222     protected void scheduleElection(FiniteDuration interval) {
223         stopElection();
224
225         // Schedule an election. When the scheduler triggers an ElectionTimeout
226         // message is sent to itself
227         electionCancel =
228             context.getActorSystem().scheduler().scheduleOnce(interval,
229                 context.getActor(), new ElectionTimeout(),
230                 context.getActorSystem().dispatcher(), context.getActor());
231     }
232
233     /**
234      * Get the current term
235      * @return
236      */
237     protected long currentTerm() {
238         return context.getTermInformation().getCurrentTerm();
239     }
240
241     /**
242      * Get the candidate for whom we voted in the current term
243      * @return
244      */
245     protected String votedFor() {
246         return context.getTermInformation().getVotedFor();
247     }
248
249     /**
250      * Get the actor associated with this behavior
251      * @return
252      */
253     protected ActorRef actor() {
254         return context.getActor();
255     }
256
257     /**
258      * Get the term from the last entry in the log
259      *
260      * @return
261      */
262     protected long lastTerm() {
263         return context.getReplicatedLog().lastTerm();
264     }
265
266     /**
267      * Get the index from the last entry in the log
268      *
269      * @return
270      */
271     protected long lastIndex() {
272         return context.getReplicatedLog().lastIndex();
273     }
274
275     /**
276      * Find the client request tracker for a specific logIndex
277      *
278      * @param logIndex
279      * @return
280      */
281     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
282         return null;
283     }
284
285     /**
286      * Find the client request tracker for a specific logIndex
287      *
288      * @param logIndex
289      * @return
290      */
291     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
292         return null;
293     }
294
295
296     /**
297      * Find the log index from the previous to last entry in the log
298      *
299      * @return
300      */
301     protected long prevLogIndex(long index){
302         ReplicatedLogEntry prevEntry =
303             context.getReplicatedLog().get(index - 1);
304         if (prevEntry != null) {
305             return prevEntry.getIndex();
306         }
307         return -1;
308     }
309
310     /**
311      * Find the log term from the previous to last entry in the log
312      * @return
313      */
314     protected long prevLogTerm(long index){
315         ReplicatedLogEntry prevEntry =
316             context.getReplicatedLog().get(index - 1);
317         if (prevEntry != null) {
318             return prevEntry.getTerm();
319         }
320         return -1;
321     }
322
323     /**
324      * Apply the provided index to the state machine
325      *
326      * @param index a log index that is known to be committed
327      */
328     protected void applyLogToStateMachine(final long index) {
329         long newLastApplied = context.getLastApplied();
330         // Now maybe we apply to the state machine
331         for (long i = context.getLastApplied() + 1;
332              i < index + 1; i++) {
333             ActorRef clientActor = null;
334             String identifier = null;
335             ClientRequestTracker tracker = removeClientRequestTracker(i);
336
337             if (tracker != null) {
338                 clientActor = tracker.getClientActor();
339                 identifier = tracker.getIdentifier();
340             }
341             ReplicatedLogEntry replicatedLogEntry =
342                 context.getReplicatedLog().get(i);
343
344             if (replicatedLogEntry != null) {
345                 // Send a local message to the local RaftActor (it's derived class to be
346                 // specific to apply the log to it's index)
347                 actor().tell(new ApplyState(clientActor, identifier,
348                     replicatedLogEntry), actor());
349                 newLastApplied = i;
350             } else {
351                 //if one index is not present in the log, no point in looping
352                 // around as the rest wont be present either
353                 LOG.warning(
354                         "Missing index {} from log. Cannot apply state. Ignoring {} to {}", i, i, index);
355                 break;
356             }
357         }
358         if(LOG.isDebugEnabled()) {
359             LOG.debug("Setting last applied to {}", newLastApplied);
360         }
361         context.setLastApplied(newLastApplied);
362
363         // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
364         // will be used during recovery
365         //in case if the above code throws an error and this message is not sent, it would be fine
366         // as the  append entries received later would initiate add this message to the journal
367         actor().tell(new ApplyLogEntries((int) context.getLastApplied()), actor());
368     }
369
370     protected Object fromSerializableMessage(Object serializable){
371         return SerializationUtils.fromSerializable(serializable);
372     }
373
374     @Override
375     public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
376         if (message instanceof AppendEntries) {
377             return appendEntries(sender, (AppendEntries) message);
378         } else if (message instanceof AppendEntriesReply) {
379             return handleAppendEntriesReply(sender, (AppendEntriesReply) message);
380         } else if (message instanceof RequestVote) {
381             return requestVote(sender, (RequestVote) message);
382         } else if (message instanceof RequestVoteReply) {
383             return handleRequestVoteReply(sender, (RequestVoteReply) message);
384         }
385         return this;
386     }
387
388     @Override public String getLeaderId() {
389         return leaderId;
390     }
391
392     protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
393         LOG.info("Switching from behavior {} to {}", this.state(), behavior.state());
394         try {
395             close();
396         } catch (Exception e) {
397             LOG.error(e, "Failed to close behavior : {}", this.state());
398         }
399
400         return behavior;
401     }
402 }