2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.cluster.Cluster;
15 import akka.cluster.Member;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.util.Optional;
18 import java.util.Random;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.controller.cluster.raft.RaftActorContext;
22 import org.opendaylight.controller.cluster.raft.RaftState;
23 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
25 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
26 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
27 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
28 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
29 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
30 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
31 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
32 import org.slf4j.Logger;
33 import scala.concurrent.duration.FiniteDuration;
36 * Abstract class that provides common code for a RaftActor behavior.
38 public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
40 * Information about the RaftActor whose behavior this class represents.
42 protected final RaftActorContext context;
45 * Used for message logging.
47 @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_PRIVATE")
48 protected final Logger log;
51 * Prepended to log messages to provide appropriate context.
53 private final String logName;
56 * The RaftState corresponding to his behavior.
58 private final RaftState state;
61 * Used to cancel a scheduled election.
63 private Cancellable electionCancel = null;
66 * The index of the last log entry that has been replicated to all raft peers.
68 private long replicatedToAllIndex = -1;
70 AbstractRaftActorBehavior(final RaftActorContext context, final RaftState state) {
71 this.context = requireNonNull(context);
72 this.state = requireNonNull(state);
73 this.log = context.getLogger();
75 logName = String.format("%s (%s)", context.getId(), state);
78 public static RaftActorBehavior createBehavior(final RaftActorContext context, final RaftState state) {
81 return new Candidate(context);
83 return new Follower(context);
85 return new IsolatedLeader(context);
87 return new Leader(context);
89 return new PreLeader(context);
91 throw new IllegalArgumentException("Unhandled state " + state);
96 public final RaftState state() {
100 protected final String logName() {
105 public void setReplicatedToAllIndex(final long replicatedToAllIndex) {
106 this.replicatedToAllIndex = replicatedToAllIndex;
110 public long getReplicatedToAllIndex() {
111 return replicatedToAllIndex;
115 * Derived classes should not directly handle AppendEntries messages it
116 * should let the base class handle it first. Once the base class handles
117 * the AppendEntries message and does the common actions that are applicable
118 * in all RaftState's it will delegate the handling of the AppendEntries
119 * message to the derived class to do more state specific handling by calling
122 * @param sender The actor that sent this message
123 * @param appendEntries The AppendEntries message
124 * @return a new behavior if it was changed or the current behavior
126 protected abstract RaftActorBehavior handleAppendEntries(ActorRef sender,
127 AppendEntries appendEntries);
130 * Handles the common logic for the AppendEntries message and delegates handling to the derived class.
132 * @param sender the ActorRef that sent the message
133 * @param appendEntries the message
134 * @return a new behavior if it was changed or the current behavior
136 protected RaftActorBehavior appendEntries(final ActorRef sender, final AppendEntries appendEntries) {
138 // 1. Reply false if term < currentTerm (ยง5.1)
139 if (appendEntries.getTerm() < currentTerm()) {
140 log.info("{}: Cannot append entries because sender's term {} is less than {}", logName(),
141 appendEntries.getTerm(), currentTerm());
143 sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex(), lastTerm(),
144 context.getPayloadVersion(), false, false, appendEntries.getLeaderRaftVersion()), actor());
149 return handleAppendEntries(sender, appendEntries);
153 * Derived classes should not directly handle AppendEntriesReply messages it
154 * should let the base class handle it first. Once the base class handles
155 * the AppendEntriesReply message and does the common actions that are
156 * applicable in all RaftState's it will delegate the handling of the
157 * AppendEntriesReply message to the derived class to do more state specific
158 * handling by calling this method
160 * @param sender The actor that sent this message
161 * @param appendEntriesReply The AppendEntriesReply message
162 * @return a new behavior if it was changed or the current behavior
164 protected abstract RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
165 AppendEntriesReply appendEntriesReply);
168 * Handles the logic for the RequestVote message that is common for all behaviors.
170 * @param sender the ActorRef that sent the message
171 * @param requestVote the message
172 * @return a new behavior if it was changed or the current behavior
174 protected RaftActorBehavior requestVote(final ActorRef sender, final RequestVote requestVote) {
176 log.debug("{}: In requestVote: {} - currentTerm: {}, votedFor: {}, lastIndex: {}, lastTerm: {}", logName(),
177 requestVote, currentTerm(), votedFor(), lastIndex(), lastTerm());
179 boolean grantVote = canGrantVote(requestVote);
182 context.getTermInformation().updateAndPersist(requestVote.getTerm(), requestVote.getCandidateId());
185 RequestVoteReply reply = new RequestVoteReply(currentTerm(), grantVote);
187 log.debug("{}: requestVote returning: {}", logName(), reply);
189 sender.tell(reply, actor());
194 protected boolean canGrantVote(final RequestVote requestVote) {
195 boolean grantVote = false;
197 // Reply false if term < currentTerm (ยง5.1)
198 if (requestVote.getTerm() < currentTerm()) {
201 // If votedFor is null or candidateId, and candidateโs log is at
202 // least as up-to-date as receiverโs log, grant vote (ยง5.2, ยง5.4)
203 } else if (votedFor() == null || votedFor()
204 .equals(requestVote.getCandidateId())) {
206 boolean candidateLatest = false;
209 // Raft determines which of two logs is more up-to-date
210 // by comparing the index and term of the last entries in the
211 // logs. If the logs have last entries with different terms, then
212 // the log with the later term is more up-to-date. If the logs
213 // end with the same term, then whichever log is longer is
215 if (requestVote.getLastLogTerm() > lastTerm()) {
216 candidateLatest = true;
217 } else if (requestVote.getLastLogTerm() == lastTerm()
218 && requestVote.getLastLogIndex() >= lastIndex()) {
219 candidateLatest = true;
222 if (candidateLatest) {
230 * Derived classes should not directly handle RequestVoteReply messages it
231 * should let the base class handle it first. Once the base class handles
232 * the RequestVoteReply message and does the common actions that are
233 * applicable in all RaftState's it will delegate the handling of the
234 * RequestVoteReply message to the derived class to do more state specific
235 * handling by calling this method
237 * @param sender The actor that sent this message
238 * @param requestVoteReply The RequestVoteReply message
239 * @return a new behavior if it was changed or the current behavior
241 protected abstract RaftActorBehavior handleRequestVoteReply(ActorRef sender,
242 RequestVoteReply requestVoteReply);
245 * Returns a duration for election with an additional variance for randomness.
247 * @return a random election duration
249 protected FiniteDuration electionDuration() {
250 long variance = new Random().nextInt(context.getConfigParams().getElectionTimeVariance());
251 return context.getConfigParams().getElectionTimeOutInterval().$plus(
252 new FiniteDuration(variance, TimeUnit.MILLISECONDS));
256 * Stops the currently scheduled election.
258 protected void stopElection() {
259 if (electionCancel != null && !electionCancel.isCancelled()) {
260 electionCancel.cancel();
264 protected boolean canStartElection() {
265 return context.getRaftPolicy().automaticElectionsEnabled() && context.isVotingMember();
269 * Schedule a new election.
271 * @param interval the duration after which we should trigger a new election
273 protected void scheduleElection(final FiniteDuration interval) {
276 // Schedule an election. When the scheduler triggers an ElectionTimeout message is sent to itself
277 electionCancel = context.getActorSystem().scheduler().scheduleOnce(interval, context.getActor(),
278 ElectionTimeout.INSTANCE, context.getActorSystem().dispatcher(), context.getActor());
282 * Returns the current election term.
284 * @return the current term
286 protected long currentTerm() {
287 return context.getTermInformation().getCurrentTerm();
291 * Returns the id of the candidate that this server voted for in current term.
293 * @return the candidate for whom we voted in the current term
295 protected String votedFor() {
296 return context.getTermInformation().getVotedFor();
300 * Returns the actor associated with this behavior.
304 protected ActorRef actor() {
305 return context.getActor();
309 * Returns the term of the last entry in the log.
313 protected long lastTerm() {
314 return context.getReplicatedLog().lastTerm();
318 * Returns the index of the last entry in the log.
322 protected long lastIndex() {
323 return context.getReplicatedLog().lastIndex();
327 * Returns the actual index of the entry in replicated log for the given index or -1 if not found.
329 * @return the log entry index or -1 if not found
331 protected long getLogEntryIndex(final long index) {
332 if (index == context.getReplicatedLog().getSnapshotIndex()) {
333 return context.getReplicatedLog().getSnapshotIndex();
336 ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
338 return entry.getIndex();
345 * Returns the actual term of the entry in the replicated log for the given index or -1 if not found.
347 * @return the log entry term or -1 if not found
349 protected long getLogEntryTerm(final long index) {
350 if (index == context.getReplicatedLog().getSnapshotIndex()) {
351 return context.getReplicatedLog().getSnapshotTerm();
354 ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
356 return entry.getTerm();
363 * Returns the actual term of the entry in the replicated log for the given index or, if not present, returns the
364 * snapshot term if the given index is in the snapshot or -1 otherwise.
366 * @return the term or -1 otherwise
368 protected long getLogEntryOrSnapshotTerm(final long index) {
369 if (context.getReplicatedLog().isInSnapshot(index)) {
370 return context.getReplicatedLog().getSnapshotTerm();
373 return getLogEntryTerm(index);
377 * Applies the log entries up to the specified index that is known to be committed to the state machine.
379 * @param index the log index
381 protected void applyLogToStateMachine(final long index) {
382 // Now maybe we apply to the state machine
383 for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
385 ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i);
386 if (replicatedLogEntry != null) {
387 // Send a local message to the local RaftActor (it's derived class to be
388 // specific to apply the log to it's index)
390 final ApplyState applyState = getApplyStateFor(replicatedLogEntry);
392 log.debug("{}: Setting last applied to {}", logName(), i);
394 context.setLastApplied(i);
395 context.getApplyStateConsumer().accept(applyState);
397 //if one index is not present in the log, no point in looping
398 // around as the rest wont be present either
399 log.warn("{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
400 logName(), i, i, index);
405 // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
406 // will be used during recovery
407 //in case if the above code throws an error and this message is not sent, it would be fine
408 // as the append entries received later would initiate add this message to the journal
409 actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
413 * Create an ApplyState message for a particular log entry so we can determine how to apply this entry.
415 * @param entry the log entry
416 * @return ApplyState for this entry
418 abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry);
421 public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
422 if (message instanceof AppendEntries) {
423 return appendEntries(sender, (AppendEntries) message);
424 } else if (message instanceof AppendEntriesReply) {
425 return handleAppendEntriesReply(sender, (AppendEntriesReply) message);
426 } else if (message instanceof RequestVote) {
427 return requestVote(sender, (RequestVote) message);
428 } else if (message instanceof RequestVoteReply) {
429 return handleRequestVoteReply(sender, (RequestVoteReply) message);
436 public RaftActorBehavior switchBehavior(final RaftActorBehavior behavior) {
437 return internalSwitchBehavior(behavior);
440 protected RaftActorBehavior internalSwitchBehavior(final RaftState newState) {
441 return internalSwitchBehavior(createBehavior(context, newState));
444 @SuppressWarnings("checkstyle:IllegalCatch")
445 protected RaftActorBehavior internalSwitchBehavior(final RaftActorBehavior newBehavior) {
446 if (!context.getRaftPolicy().automaticElectionsEnabled()) {
450 log.info("{} :- Switching from behavior {} to {}, election term: {}", logName(), this.state(),
451 newBehavior.state(), context.getTermInformation().getCurrentTerm());
454 } catch (RuntimeException e) {
455 log.error("{}: Failed to close behavior : {}", logName(), this.state(), e);
461 protected int getMajorityVoteCount(final int numPeers) {
462 // Votes are required from a majority of the peers including self.
463 // The numMajority field therefore stores a calculated value
464 // of the number of votes required for this candidate to win an
465 // election based on it's known peers.
466 // If a peer was added during normal operation and raft replicas
467 // came to know about them then the new peer would also need to be
468 // taken into consideration when calculating this value.
469 // Here are some examples for what the numMajority would be for n
471 // 0 peers = 1 numMajority -: (0 + 1) / 2 + 1 = 1
472 // 2 peers = 2 numMajority -: (2 + 1) / 2 + 1 = 2
473 // 4 peers = 3 numMajority -: (4 + 1) / 2 + 1 = 3
478 numMajority = (numPeers + self) / 2 + 1;
486 * Performs a snapshot with no capture on the replicated log. It clears the log from the supplied index or
487 * lastApplied-1 which ever is minimum.
489 * @param snapshotCapturedIndex the index from which to clear
491 protected void performSnapshotWithoutCapture(final long snapshotCapturedIndex) {
492 long actualIndex = context.getSnapshotManager().trimLog(snapshotCapturedIndex);
494 if (actualIndex != -1) {
495 setReplicatedToAllIndex(actualIndex);
499 protected String getId() {
500 return context.getId();
503 // Check whether we should update the term. In case of half-connected nodes, we want to ignore RequestVote
504 // messages, as the candidate is not able to receive our response.
505 protected boolean shouldUpdateTerm(final RaftRPC rpc) {
506 if (!(rpc instanceof RequestVote)) {
510 final RequestVote requestVote = (RequestVote) rpc;
511 log.debug("{}: Found higher term in RequestVote rpc, verifying whether it's safe to update term.", logName());
512 final Optional<Cluster> maybeCluster = context.getCluster();
513 if (!maybeCluster.isPresent()) {
517 final Cluster cluster = maybeCluster.get();
519 final Set<Member> unreachable = cluster.state().getUnreachable();
520 log.debug("{}: Cluster state: {}", logName(), unreachable);
522 for (Member member : unreachable) {
523 for (String role : member.getRoles()) {
524 if (requestVote.getCandidateId().startsWith(role)) {
525 log.debug("{}: Unreachable member: {}, matches candidateId in: {}, not updating term", logName(),
526 member, requestVote);
532 log.debug("{}: Candidate in requestVote:{} with higher term appears reachable, updating term.", logName(),