Do not update term from unreachable members
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractRaftActorBehavior.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.cluster.Cluster;
15 import akka.cluster.Member;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.util.Optional;
18 import java.util.Random;
19 import java.util.Set;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
22 import org.opendaylight.controller.cluster.raft.RaftActorContext;
23 import org.opendaylight.controller.cluster.raft.RaftState;
24 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
25 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
26 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
28 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
29 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
31 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
32 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
33 import org.slf4j.Logger;
34 import scala.concurrent.duration.FiniteDuration;
35
36 /**
37  * Abstract class that provides common code for a RaftActor behavior.
38  */
39 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
40     /**
41      * Information about the RaftActor whose behavior this class represents.
42      */
43     protected final RaftActorContext context;
44
45     /**
46      * Used for message logging.
47      */
48     @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE")
49     protected final Logger log;
50
51     /**
52      * Prepended to log messages to provide appropriate context.
53      */
54     private final String logName;
55
56     /**
57      * The RaftState corresponding to his behavior.
58      */
59     private final RaftState state;
60
61     /**
62      * Used to cancel a scheduled election.
63      */
64     private Cancellable electionCancel = null;
65
66     /**
67      * The index of the last log entry that has been replicated to all raft peers.
68      */
69     private long replicatedToAllIndex = -1;
70
71     AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) {
72         this.context = requireNonNull(context);
73         this.state = requireNonNull(state);
74         this.log = context.getLogger();
75
76         logName = String.format("%s (%s)", context.getId(), state);
77     }
78
79     public static RaftActorBehavior createBehavior(final RaftActorContext context, final RaftState state) {
80         switch (state) {
81             case Candidate:
82                 return new Candidate(context);
83             case Follower:
84                 return new Follower(context);
85             case IsolatedLeader:
86                 return new IsolatedLeader(context);
87             case Leader:
88                 return new Leader(context);
89             case PreLeader:
90                 return new PreLeader(context);
91             default:
92                 throw new IllegalArgumentException("Unhandled state " + state);
93         }
94     }
95
96     @Override
97     public final RaftState state() {
98         return state;
99     }
100
101     protected final String logName() {
102         return logName;
103     }
104
105     @Override
106     public void setReplicatedToAllIndex(final long replicatedToAllIndex) {
107         this.replicatedToAllIndex = replicatedToAllIndex;
108     }
109
110     @Override
111     public long getReplicatedToAllIndex() {
112         return replicatedToAllIndex;
113     }
114
115     /**
116      * Derived classes should not directly handle AppendEntries messages it
117      * should let the base class handle it first. Once the base class handles
118      * the AppendEntries message and does the common actions that are applicable
119      * in all RaftState's it will delegate the handling of the AppendEntries
120      * message to the derived class to do more state specific handling by calling
121      * this method
122      *
123      * @param sender         The actor that sent this message
124      * @param appendEntries  The AppendEntries message
125      * @return a new behavior if it was changed or the current behavior
126      */
127     protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
128         AppendEntries appendEntries);
129
130     /**
131      * Handles the common logic for the AppendEntries message and delegates handling to the derived class.
132      *
133      * @param sender the ActorRef that sent the message
134      * @param appendEntries the message
135      * @return a new behavior if it was changed or the current behavior
136      */
137     protected RaftActorBehavior appendEntries(final ActorRef sender, final AppendEntries appendEntries) {
138
139         // 1. Reply false if term < currentTerm (ยง5.1)
140         if (appendEntries.getTerm() < currentTerm()) {
141             log.info("{}: Cannot append entries because sender's term {} is less than {}", logName(),
142                     appendEntries.getTerm(), currentTerm());
143
144             sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
145                     context.getPayloadVersion(), false, false, appendEntries.getLeaderRaftVersion()), actor());
146             return this;
147         }
148
149
150         return handleAppendEntries(sender, appendEntries);
151     }
152
153     /**
154      * Derived classes should not directly handle AppendEntriesReply messages it
155      * should let the base class handle it first. Once the base class handles
156      * the AppendEntriesReply message and does the common actions that are
157      * applicable in all RaftState's it will delegate the handling of the
158      * AppendEntriesReply message to the derived class to do more state specific
159      * handling by calling this method
160      *
161      * @param sender             The actor that sent this message
162      * @param appendEntriesReply The AppendEntriesReply message
163      * @return a new behavior if it was changed or the current behavior
164      */
165     protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
166         AppendEntriesReply appendEntriesReply);
167
168     /**
169      * Handles the logic for the RequestVote message that is common for all behaviors.
170      *
171      * @param sender the ActorRef that sent the message
172      * @param requestVote the message
173      * @return a new behavior if it was changed or the current behavior
174      */
175     protected RaftActorBehavior requestVote(final ActorRef sender, final RequestVote requestVote) {
176
177         log.debug("{}: In requestVote:  {} - currentTerm: {}, votedFor: {}, lastIndex: {}, lastTerm: {}", logName(),
178                 requestVote, currentTerm(), votedFor(), lastIndex(), lastTerm());
179
180         boolean grantVote = canGrantVote(requestVote);
181
182         if (grantVote) {
183             context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId());
184         }
185
186         RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote);
187
188         log.debug("{}: requestVote returning: {}", logName(), reply);
189
190         sender.tell(reply, actor());
191
192         return this;
193     }
194
195     protected boolean canGrantVote(final RequestVote requestVote) {
196         boolean grantVote = false;
197
198         //  Reply false if term < currentTerm (ยง5.1)
199         if (requestVote.getTerm() < currentTerm()) {
200             grantVote = false;
201
202             // If votedFor is null or candidateId, and candidateโ€™s log is at
203             // least as up-to-date as receiverโ€™s log, grant vote (ยง5.2, ยง5.4)
204         } else if (votedFor() == null || votedFor()
205                 .equals(requestVote.getCandidateId())) {
206
207             boolean candidateLatest = false;
208
209             // From ยง5.4.1
210             // Raft determines which of two logs is more up-to-date
211             // by comparing the index and term of the last entries in the
212             // logs. If the logs have last entries with different terms, then
213             // the log with the later term is more up-to-date. If the logs
214             // end with the same term, then whichever log is longer is
215             // more up-to-date.
216             if (requestVote.getLastLogTerm() > lastTerm()) {
217                 candidateLatest = true;
218             } else if (requestVote.getLastLogTerm() == lastTerm()
219                     && requestVote.getLastLogIndex() >= lastIndex()) {
220                 candidateLatest = true;
221             }
222
223             if (candidateLatest) {
224                 grantVote = true;
225             }
226         }
227         return grantVote;
228     }
229
230     /**
231      * Derived classes should not directly handle RequestVoteReply messages it
232      * should let the base class handle it first. Once the base class handles
233      * the RequestVoteReply message and does the common actions that are
234      * applicable in all RaftState's it will delegate the handling of the
235      * RequestVoteReply message to the derived class to do more state specific
236      * handling by calling this method
237      *
238      * @param sender           The actor that sent this message
239      * @param requestVoteReply The RequestVoteReply message
240      * @return a new behavior if it was changed or the current behavior
241      */
242     protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender,
243         RequestVoteReply requestVoteReply);
244
245     /**
246      * Returns a duration for election with an additional variance for randomness.
247      *
248      * @return a random election duration
249      */
250     protected FiniteDuration electionDuration() {
251         long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance());
252         return context.getConfigParams().getElectionTimeOutInterval().$plus(
253                 new FiniteDuration(variance, TimeUnit.MILLISECONDS));
254     }
255
256     /**
257      * Stops the currently scheduled election.
258      */
259     protected void stopElection() {
260         if (electionCancel != null && !electionCancel.isCancelled()) {
261             electionCancel.cancel();
262         }
263     }
264
265     protected boolean canStartElection() {
266         return context.getRaftPolicy().automaticElectionsEnabled() && context.isVotingMember();
267     }
268
269     /**
270      * Schedule a new election.
271      *
272      * @param interval the duration after which we should trigger a new election
273      */
274     protected void scheduleElection(final FiniteDuration interval) {
275         stopElection();
276
277         // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself
278         electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(),
279                 ElectionTimeout.INSTANCE, context.getActorSystem().dispatcher(), context.getActor());
280     }
281
282     /**
283      * Returns the current election term.
284      *
285      * @return the current term
286      */
287     protected long currentTerm() {
288         return context.getTermInformation().getCurrentTerm();
289     }
290
291     /**
292      * Returns the id of the candidate that this server voted for in current term.
293      *
294      * @return the candidate for whom we voted in the current term
295      */
296     protected String votedFor() {
297         return context.getTermInformation().getVotedFor();
298     }
299
300     /**
301      * Returns the actor associated with this behavior.
302      *
303      * @return the actor
304      */
305     protected ActorRef actor() {
306         return context.getActor();
307     }
308
309     /**
310      * Returns the term of the last entry in the log.
311      *
312      * @return the term
313      */
314     protected long lastTerm() {
315         return context.getReplicatedLog().lastTerm();
316     }
317
318     /**
319      * Returns the index of the last entry in the log.
320      *
321      * @return the index
322      */
323     protected long lastIndex() {
324         return context.getReplicatedLog().lastIndex();
325     }
326
327     /**
328      * Removes and returns the ClientRequestTracker for the specified log index.
329      * @param logIndex the log index
330      * @return the ClientRequestTracker or null if none available
331      */
332     protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
333         return null;
334     }
335
336     /**
337      * Returns the actual index of the entry in replicated log for the given index or -1 if not found.
338      *
339      * @return the log entry index or -1 if not found
340      */
341     protected long getLogEntryIndex(final long index) {
342         if (index == context.getReplicatedLog().getSnapshotIndex()) {
343             return context.getReplicatedLog().getSnapshotIndex();
344         }
345
346         ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
347         if (entry != null) {
348             return entry.getIndex();
349         }
350
351         return -1;
352     }
353
354     /**
355      * Returns the actual term of the entry in the replicated log for the given index or -1 if not found.
356      *
357      * @return the log entry term or -1 if not found
358      */
359     protected long getLogEntryTerm(final long index) {
360         if (index == context.getReplicatedLog().getSnapshotIndex()) {
361             return context.getReplicatedLog().getSnapshotTerm();
362         }
363
364         ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
365         if (entry != null) {
366             return entry.getTerm();
367         }
368
369         return -1;
370     }
371
372     /**
373      * Returns the actual term of the entry in the replicated log for the given index or, if not present, returns the
374      * snapshot term if the given index is in the snapshot or -1 otherwise.
375      *
376      * @return the term or -1 otherwise
377      */
378     protected long getLogEntryOrSnapshotTerm(final long index) {
379         if (context.getReplicatedLog().isInSnapshot(index)) {
380             return context.getReplicatedLog().getSnapshotTerm();
381         }
382
383         return getLogEntryTerm(index);
384     }
385
386     /**
387      * Applies the log entries up to the specified index that is known to be committed to the state machine.
388      *
389      * @param index the log index
390      */
391     protected void applyLogToStateMachine(final long index) {
392         // Now maybe we apply to the state machine
393         for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
394
395             ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i);
396             if (replicatedLogEntry != null) {
397                 // Send a local message to the local RaftActor (it's derived class to be
398                 // specific to apply the log to it's index)
399
400                 final ApplyState applyState;
401                 final ClientRequestTracker tracker = removeClientRequestTracker(i);
402                 if (tracker != null) {
403                     applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
404                 } else {
405                     applyState = new ApplyState(null, null, replicatedLogEntry);
406                 }
407
408                 log.debug("{}: Setting last applied to {}", logName(), i);
409
410                 context.setLastApplied(i);
411                 context.getApplyStateConsumer().accept(applyState);
412             } else {
413                 //if one index is not present in the log, no point in looping
414                 // around as the rest wont be present either
415                 log.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
416                         logName(), i, i, index);
417                 break;
418             }
419         }
420
421         // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
422         // will be used during recovery
423         //in case if the above code throws an error and this message is not sent, it would be fine
424         // as the  append entries received later would initiate add this message to the journal
425         actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
426     }
427
428     @Override
429     public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
430         if (message instanceof AppendEntries) {
431             return appendEntries(sender, (AppendEntries) message);
432         } else if (message instanceof AppendEntriesReply) {
433             return handleAppendEntriesReply(sender, (AppendEntriesReply) message);
434         } else if (message instanceof RequestVote) {
435             return requestVote(sender, (RequestVote) message);
436         } else if (message instanceof RequestVoteReply) {
437             return handleRequestVoteReply(sender, (RequestVoteReply) message);
438         } else {
439             return null;
440         }
441     }
442
443     @Override
444     public RaftActorBehavior switchBehavior(final RaftActorBehavior behavior) {
445         return internalSwitchBehavior(behavior);
446     }
447
448     protected RaftActorBehavior internalSwitchBehavior(final RaftState newState) {
449         return internalSwitchBehavior(createBehavior(context, newState));
450     }
451
452     @SuppressWarnings("checkstyle:IllegalCatch")
453     protected RaftActorBehavior internalSwitchBehavior(final RaftActorBehavior newBehavior) {
454         if (!context.getRaftPolicy().automaticElectionsEnabled()) {
455             return this;
456         }
457
458         log.info("{} :- Switching from behavior {} to {}, election term: {}", logName(), this.state(),
459                 newBehavior.state(), context.getTermInformation().getCurrentTerm());
460         try {
461             close();
462         } catch (RuntimeException e) {
463             log.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
464         }
465         return newBehavior;
466     }
467
468
469     protected int getMajorityVoteCount(final int numPeers) {
470         // Votes are required from a majority of the peers including self.
471         // The numMajority field therefore stores a calculated value
472         // of the number of votes required for this candidate to win an
473         // election based on it's known peers.
474         // If a peer was added during normal operation and raft replicas
475         // came to know about them then the new peer would also need to be
476         // taken into consideration when calculating this value.
477         // Here are some examples for what the numMajority would be for n
478         // peers
479         // 0 peers = 1 numMajority -: (0 + 1) / 2 + 1 = 1
480         // 2 peers = 2 numMajority -: (2 + 1) / 2 + 1 = 2
481         // 4 peers = 3 numMajority -: (4 + 1) / 2 + 1 = 3
482
483         int numMajority = 0;
484         if (numPeers > 0) {
485             int self = 1;
486             numMajority = (numPeers + self) / 2 + 1;
487         }
488         return numMajority;
489
490     }
491
492
493     /**
494      * Performs a snapshot with no capture on the replicated log. It clears the log from the supplied index or
495      * lastApplied-1 which ever is minimum.
496      *
497      * @param snapshotCapturedIndex the index from which to clear
498      */
499     protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
500         long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex);
501
502         if (actualIndex != -1) {
503             setReplicatedToAllIndex(actualIndex);
504         }
505     }
506
507     protected String getId() {
508         return context.getId();
509     }
510
511     // Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote
512     // messages, as the candidate is not able to receive our response.
513     protected boolean shouldUpdateTerm(final RaftRPC rpc) {
514         if (!(rpc instanceof RequestVote)) {
515             return true;
516         }
517
518         final RequestVote requestVote = (RequestVote) rpc;
519         log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName());
520         final Optional<Cluster> maybeCluster = context.getCluster();
521         if (!maybeCluster.isPresent()) {
522             return true;
523         }
524
525         final Cluster cluster = maybeCluster.get();
526
527         final Set<Member> unreachable = cluster.state().getUnreachable();
528         log.debug("{}: Cluster state: {}", logName(), unreachable);
529
530         for (Member member : unreachable) {
531             for (String role : member.getRoles()) {
532                 if (requestVote.getCandidateId().startsWith(role)) {
533                     log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(),
534                         member, requestVote);
535                     return false;
536                 }
537             }
538         }
539
540         log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(),
541             requestVote);
542         return true;
543     }
544 }