a1bcf8541c4c26b260be916583eb031e29687897
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehavior.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Cancellable;
13 import java.util.Random;
14 import java.util.concurrent.TimeUnit;
15 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
16 import org.opendaylight.controller.cluster.raft.RaftActorContext;
17 import org.opendaylight.controller.cluster.raft.RaftState;
18 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
19 import org.opendaylight.controller.cluster.raft.SerializationUtils;
20 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
22 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
23 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
24 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
25 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
26 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
27 import org.slf4j.Logger;
28 import scala.concurrent.duration.FiniteDuration;
29
30 /**
31  * Abstract class that represents the behavior of a RaftActor
32  * <p/>
33  * All Servers:
34  * <ul>
35  * <li> If commitIndex > lastApplied: increment lastApplied, apply
36  * log[lastApplied] to state machine (§5.3)
37  * <li> If RPC request or response contains term T > currentTerm:
38  * set currentTerm = T, convert to follower (§5.1)
39  */
40 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
41
42     /**
43      * Information about the RaftActor whose behavior this class represents
44      */
45     protected final RaftActorContext context;
46
47     /**
48      *
49      */
50     protected final Logger LOG;
51
52     /**
53      *
54      */
55     private Cancellable electionCancel = null;
56
57     /**
58      *
59      */
60     protected String leaderId = null;
61
62     private long replicatedToAllIndex = -1;
63
64     private final String logName;
65
66     private final RaftState state;
67
68     protected AbstractRaftActorBehavior(RaftActorContext context, RaftState state) {
69         this.context = context;
70         this.state = state;
71         this.LOG = context.getLogger();
72
73         logName = String.format("%s (%s)", context.getId(), state);
74     }
75
76     @Override
77     public RaftState state() {
78         return state;
79     }
80
81     public String logName() {
82         return logName;
83     }
84
85     @Override
86     public void setReplicatedToAllIndex(long replicatedToAllIndex) {
87         this.replicatedToAllIndex = replicatedToAllIndex;
88     }
89
90     @Override
91     public long getReplicatedToAllIndex() {
92         return replicatedToAllIndex;
93     }
94
95     /**
96      * Derived classes should not directly handle AppendEntries messages it
97      * should let the base class handle it first. Once the base class handles
98      * the AppendEntries message and does the common actions that are applicable
99      * in all RaftState's it will delegate the handling of the AppendEntries
100      * message to the derived class to do more state specific handling by calling
101      * this method
102      *
103      * @param sender         The actor that sent this message
104      * @param appendEntries  The AppendEntries message
105      * @return
106      */
107     protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
108         AppendEntries appendEntries);
109
110
111     /**
112      * appendEntries first processes the AppendEntries message and then
113      * delegates handling to a specific behavior
114      *
115      * @param sender
116      * @param appendEntries
117      * @return
118      */
119     protected RaftActorBehavior appendEntries(ActorRef sender,
120         AppendEntries appendEntries) {
121
122         // 1. Reply false if term < currentTerm (§5.1)
123         if (appendEntries.getTerm() < currentTerm()) {
124             if(LOG.isDebugEnabled()) {
125                 LOG.debug("{}: Cannot append entries because sender term {} is less than {}",
126                         logName(), appendEntries.getTerm(), currentTerm());
127             }
128
129             sender.tell(
130                 new AppendEntriesReply(context.getId(), currentTerm(), false,
131                     lastIndex(), lastTerm()), actor()
132             );
133             return this;
134         }
135
136
137         return handleAppendEntries(sender, appendEntries);
138     }
139
140     /**
141      * Derived classes should not directly handle AppendEntriesReply messages it
142      * should let the base class handle it first. Once the base class handles
143      * the AppendEntriesReply message and does the common actions that are
144      * applicable in all RaftState's it will delegate the handling of the
145      * AppendEntriesReply message to the derived class to do more state specific
146      * handling by calling this method
147      *
148      * @param sender             The actor that sent this message
149      * @param appendEntriesReply The AppendEntriesReply message
150      * @return
151      */
152     protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
153         AppendEntriesReply appendEntriesReply);
154
155     /**
156      * requestVote handles the RequestVote message. This logic is common
157      * for all behaviors
158      *
159      * @param sender
160      * @param requestVote
161      * @return
162      */
163     protected RaftActorBehavior requestVote(ActorRef sender, RequestVote requestVote) {
164
165         LOG.debug("{}: In requestVote:  {}", logName(), requestVote);
166
167         boolean grantVote = false;
168
169         //  Reply false if term < currentTerm (§5.1)
170         if (requestVote.getTerm() < currentTerm()) {
171             grantVote = false;
172
173             // If votedFor is null or candidateId, and candidate’s log is at
174             // least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
175         } else if (votedFor() == null || votedFor()
176             .equals(requestVote.getCandidateId())) {
177
178             boolean candidateLatest = false;
179
180             // From §5.4.1
181             // Raft determines which of two logs is more up-to-date
182             // by comparing the index and term of the last entries in the
183             // logs. If the logs have last entries with different terms, then
184             // the log with the later term is more up-to-date. If the logs
185             // end with the same term, then whichever log is longer is
186             // more up-to-date.
187             if (requestVote.getLastLogTerm() > lastTerm()) {
188                 candidateLatest = true;
189             } else if ((requestVote.getLastLogTerm() == lastTerm())
190                 && requestVote.getLastLogIndex() >= lastIndex()) {
191                 candidateLatest = true;
192             }
193
194             if (candidateLatest) {
195                 grantVote = true;
196                 context.getTermInformation().updateAndPersist(requestVote.getTerm(),
197                     requestVote.getCandidateId());
198             }
199         }
200
201         RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote);
202
203         LOG.debug("{}: requestVote returning: {}", logName(), reply);
204
205         sender.tell(reply, actor());
206
207         return this;
208     }
209
210     /**
211      * Derived classes should not directly handle RequestVoteReply messages it
212      * should let the base class handle it first. Once the base class handles
213      * the RequestVoteReply message and does the common actions that are
214      * applicable in all RaftState's it will delegate the handling of the
215      * RequestVoteReply message to the derived class to do more state specific
216      * handling by calling this method
217      *
218      * @param sender           The actor that sent this message
219      * @param requestVoteReply The RequestVoteReply message
220      * @return
221      */
222     protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender,
223         RequestVoteReply requestVoteReply);
224
225     /**
226      * Creates a random election duration
227      *
228      * @return
229      */
230     protected FiniteDuration electionDuration() {
231         long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance());
232         return context.getConfigParams().getElectionTimeOutInterval().$plus(
233                 new FiniteDuration(variance, TimeUnit.MILLISECONDS));
234     }
235
236     /**
237      * stop the scheduled election
238      */
239     protected void stopElection() {
240         if (electionCancel != null && !electionCancel.isCancelled()) {
241             electionCancel.cancel();
242         }
243     }
244
245     /**
246      * schedule a new election
247      *
248      * @param interval
249      */
250     protected void scheduleElection(FiniteDuration interval) {
251         stopElection();
252
253         // Schedule an election. When the scheduler triggers an ElectionTimeout
254         // message is sent to itself
255         electionCancel =
256             context.getActorSystem().scheduler().scheduleOnce(interval,
257                 context.getActor(), new ElectionTimeout(),
258                 context.getActorSystem().dispatcher(), context.getActor());
259     }
260
261     /**
262      * Get the current term
263      * @return
264      */
265     protected long currentTerm() {
266         return context.getTermInformation().getCurrentTerm();
267     }
268
269     /**
270      * Get the candidate for whom we voted in the current term
271      * @return
272      */
273     protected String votedFor() {
274         return context.getTermInformation().getVotedFor();
275     }
276
277     /**
278      * Get the actor associated with this behavior
279      * @return
280      */
281     protected ActorRef actor() {
282         return context.getActor();
283     }
284
285     /**
286      * Get the term from the last entry in the log
287      *
288      * @return
289      */
290     protected long lastTerm() {
291         return context.getReplicatedLog().lastTerm();
292     }
293
294     /**
295      * Get the index from the last entry in the log
296      *
297      * @return
298      */
299     protected long lastIndex() {
300         return context.getReplicatedLog().lastIndex();
301     }
302
303     /**
304      * Find the client request tracker for a specific logIndex
305      *
306      * @param logIndex
307      * @return
308      */
309     protected ClientRequestTracker findClientRequestTracker(long logIndex) {
310         return null;
311     }
312
313     /**
314      * Find the client request tracker for a specific logIndex
315      *
316      * @param logIndex
317      * @return
318      */
319     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
320         return null;
321     }
322
323
324     /**
325      * Find the log index from the previous to last entry in the log
326      *
327      * @return
328      */
329     protected long prevLogIndex(long index){
330         ReplicatedLogEntry prevEntry =
331             context.getReplicatedLog().get(index - 1);
332         if (prevEntry != null) {
333             return prevEntry.getIndex();
334         }
335         return -1;
336     }
337
338     /**
339      * Find the log term from the previous to last entry in the log
340      * @return
341      */
342     protected long prevLogTerm(long index){
343         ReplicatedLogEntry prevEntry =
344             context.getReplicatedLog().get(index - 1);
345         if (prevEntry != null) {
346             return prevEntry.getTerm();
347         }
348         return -1;
349     }
350
351     /**
352      * Apply the provided index to the state machine
353      *
354      * @param index a log index that is known to be committed
355      */
356     protected void applyLogToStateMachine(final long index) {
357         long newLastApplied = context.getLastApplied();
358         // Now maybe we apply to the state machine
359         for (long i = context.getLastApplied() + 1;
360              i < index + 1; i++) {
361             ActorRef clientActor = null;
362             String identifier = null;
363             ClientRequestTracker tracker = removeClientRequestTracker(i);
364
365             if (tracker != null) {
366                 clientActor = tracker.getClientActor();
367                 identifier = tracker.getIdentifier();
368             }
369             ReplicatedLogEntry replicatedLogEntry =
370                 context.getReplicatedLog().get(i);
371
372             if (replicatedLogEntry != null) {
373                 // Send a local message to the local RaftActor (it's derived class to be
374                 // specific to apply the log to it's index)
375                 actor().tell(new ApplyState(clientActor, identifier,
376                     replicatedLogEntry), actor());
377                 newLastApplied = i;
378             } else {
379                 //if one index is not present in the log, no point in looping
380                 // around as the rest wont be present either
381                 LOG.warn(
382                         "{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
383                         logName(), i, i, index);
384                 break;
385             }
386         }
387         if(LOG.isDebugEnabled()) {
388             LOG.debug("{}: Setting last applied to {}", logName(), newLastApplied);
389         }
390         context.setLastApplied(newLastApplied);
391
392         // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
393         // will be used during recovery
394         //in case if the above code throws an error and this message is not sent, it would be fine
395         // as the  append entries received later would initiate add this message to the journal
396         actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
397     }
398
399     protected Object fromSerializableMessage(Object serializable){
400         return SerializationUtils.fromSerializable(serializable);
401     }
402
403     @Override
404     public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
405         if (message instanceof AppendEntries) {
406             return appendEntries(sender, (AppendEntries) message);
407         } else if (message instanceof AppendEntriesReply) {
408             return handleAppendEntriesReply(sender, (AppendEntriesReply) message);
409         } else if (message instanceof RequestVote) {
410             return requestVote(sender, (RequestVote) message);
411         } else if (message instanceof RequestVoteReply) {
412             return handleRequestVoteReply(sender, (RequestVoteReply) message);
413         }
414         return this;
415     }
416
417     @Override public String getLeaderId() {
418         return leaderId;
419     }
420
421     protected RaftActorBehavior switchBehavior(RaftActorBehavior behavior) {
422         LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), behavior.state());
423         try {
424             close();
425         } catch (Exception e) {
426             LOG.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
427         }
428
429         return behavior;
430     }
431
432     protected int getMajorityVoteCount(int numPeers) {
433         // Votes are required from a majority of the peers including self.
434         // The numMajority field therefore stores a calculated value
435         // of the number of votes required for this candidate to win an
436         // election based on it's known peers.
437         // If a peer was added during normal operation and raft replicas
438         // came to know about them then the new peer would also need to be
439         // taken into consideration when calculating this value.
440         // Here are some examples for what the numMajority would be for n
441         // peers
442         // 0 peers = 1 numMajority -: (0 + 1) / 2 + 1 = 1
443         // 2 peers = 2 numMajority -: (2 + 1) / 2 + 1 = 2
444         // 4 peers = 3 numMajority -: (4 + 1) / 2 + 1 = 3
445
446         int numMajority = 0;
447         if (numPeers > 0) {
448             int self = 1;
449             numMajority = (numPeers + self) / 2 + 1;
450         }
451         return numMajority;
452
453     }
454
455
456     /**
457      * Performs a snapshot with no capture on the replicated log.
458      * It clears the log from the supplied index or last-applied-1 which ever is minimum.
459      *
460      * @param snapshotCapturedIndex
461      */
462     protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
463         //  we would want to keep the lastApplied as its used while capturing snapshots
464         long lastApplied = context.getLastApplied();
465         long tempMin = Math.min(snapshotCapturedIndex, (lastApplied > -1 ? lastApplied - 1 : -1));
466
467         if(LOG.isTraceEnabled()) {
468             LOG.trace("{}: performSnapshotWithoutCapture: snapshotCapturedIndex: {}, lastApplied: {}, tempMin: {}",
469                     logName, snapshotCapturedIndex, lastApplied, tempMin);
470         }
471
472         if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin))  {
473             LOG.debug("{}: fakeSnapshot purging log to {} for term {}", logName(), tempMin,
474                     context.getTermInformation().getCurrentTerm());
475
476             //use the term of the temp-min, since we check for isPresent, entry will not be null
477             ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
478             context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
479             context.getReplicatedLog().snapshotCommit();
480             setReplicatedToAllIndex(tempMin);
481         } else if(tempMin > getReplicatedToAllIndex()) {
482             // It's possible a follower was lagging and an install snapshot advanced its match index past
483             // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
484             // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
485             // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
486             // trim the log to the last applied index even if previous entries weren't replicated to all followers.
487             setReplicatedToAllIndex(tempMin);
488         }
489     }
490
491     protected String getId(){
492         return context.getId();
493     }
494
495 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.