2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.SaveSnapshotFailure;
18 import akka.persistence.SaveSnapshotSuccess;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.persistence.UntypedPersistentActor;
22 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
23 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
24 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
25 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
26 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
28 import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
29 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
32 import java.io.Serializable;
33 import java.util.ArrayList;
34 import java.util.List;
38 * RaftActor encapsulates a state machine that needs to be kept synchronized
39 * in a cluster. It implements the RAFT algorithm as described in the paper
40 * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
41 * In Search of an Understandable Consensus Algorithm</a>
43 * RaftActor has 3 states and each state has a certain behavior associated
44 * with it. A Raft actor can behave as,
47 * <li> A Follower (or) </li>
48 * <li> A Candidate </li>
52 * A RaftActor MUST be a Leader in order to accept requests from clients to
53 * change the state of it's encapsulated state machine. Once a RaftActor becomes
54 * a Leader it is also responsible for ensuring that all followers ultimately
55 * have the same log and therefore the same state machine as itself.
58 * The current behavior of a RaftActor determines how election for leadership
59 * is initiated and how peer RaftActors react to request for votes.
62 * Each RaftActor also needs to know the current election term. It uses this
63 * information for a couple of things. One is to simply figure out who it
64 * voted for in the last election. Another is to figure out if the message
65 * it received to update it's state is stale.
68 * The RaftActor uses akka-persistence to store it's replicated log.
69 * Furthermore through it's behaviors a Raft Actor determines
72 * <li> when a log entry should be persisted </li>
73 * <li> when a log entry should be applied to the state machine (and) </li>
74 * <li> when a snapshot should be saved </li>
77 public abstract class RaftActor extends UntypedPersistentActor {
78 protected final LoggingAdapter LOG =
79 Logging.getLogger(getContext().system(), this);
82 * The current state determines the current behavior of a RaftActor
83 * A Raft Actor always starts off in the Follower State
85 private RaftActorBehavior currentBehavior;
88 * This context should NOT be passed directly to any other actor it is
89 * only to be consumed by the RaftActorBehaviors
91 private RaftActorContext context;
94 * The in-memory journal
96 private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
99 public RaftActor(String id, Map<String, String> peerAddresses) {
100 context = new RaftActorContextImpl(this.getSelf(),
102 id, new ElectionTermImpl(getSelf().path().toString()),
103 -1, -1, replicatedLog, peerAddresses, LOG);
106 @Override public void onReceiveRecover(Object message) {
107 if (message instanceof SnapshotOffer) {
108 SnapshotOffer offer = (SnapshotOffer) message;
109 Snapshot snapshot = (Snapshot) offer.snapshot();
111 // Create a replicated log with the snapshot information
112 // The replicated log can be used later on to retrieve this snapshot
113 // when we need to install it on a peer
114 replicatedLog = new ReplicatedLogImpl(snapshot);
116 // Apply the snapshot to the actors state
117 applySnapshot(snapshot.getState());
119 } else if (message instanceof ReplicatedLogEntry) {
120 replicatedLog.append((ReplicatedLogEntry) message);
121 } else if (message instanceof RecoveryCompleted) {
123 "Last index in log : " + replicatedLog.lastIndex());
124 currentBehavior = switchBehavior(RaftState.Follower);
128 @Override public void onReceiveCommand(Object message) {
129 if (message instanceof ApplyState) {
131 ApplyState applyState = (ApplyState) message;
133 LOG.debug("Applying state for log index {}",
134 applyState.getReplicatedLogEntry().getIndex());
136 applyState(applyState.getClientActor(), applyState.getIdentifier(),
137 applyState.getReplicatedLogEntry().getData());
138 } else if(message instanceof ApplySnapshot ) {
139 applySnapshot(((ApplySnapshot) message).getSnapshot());
140 } else if (message instanceof FindLeader) {
143 context.getPeerAddress(currentBehavior.getLeaderId())),
146 } else if (message instanceof SaveSnapshotSuccess) {
147 SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
149 // TODO: Not sure if we want to be this aggressive with trimming stuff
150 trimPersistentData(success.metadata().sequenceNr());
152 } else if (message instanceof SaveSnapshotFailure) {
153 // TODO: Handle failure in saving the snapshot
156 currentBehavior.handleMessage(getSender(), message);
157 currentBehavior = switchBehavior(state);
164 * When a derived RaftActor needs to persist something it must call
171 protected void persistData(ActorRef clientActor, String identifier,
173 LOG.debug("Persist data " + identifier);
174 ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
175 context.getReplicatedLog().lastIndex() + 1,
176 context.getTermInformation().getCurrentTerm(), data);
179 .appendAndPersist(clientActor, identifier, replicatedLogEntry);
182 protected String getId() {
183 return context.getId();
187 * Derived actors can call the isLeader method to check if the current
188 * RaftActor is the Leader or not
190 * @return true it this RaftActor is a Leader false otherwise
192 protected boolean isLeader() {
193 return context.getId().equals(currentBehavior.getLeaderId());
197 * Derived actor can call getLeader if they need a reference to the Leader.
198 * This would be useful for example in forwarding a request to an actor
199 * which is the leader
201 * @return A reference to the leader if known, null otherwise
203 protected ActorSelection getLeader() {
204 String leaderId = currentBehavior.getLeaderId();
205 if (leaderId == null) {
208 String peerAddress = context.getPeerAddress(leaderId);
209 LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
211 return context.actorSelection(peerAddress);
215 * The applyState method will be called by the RaftActor when some data
216 * needs to be applied to the actor's state
218 * @param clientActor A reference to the client who sent this message. This
219 * is the same reference that was passed to persistData
220 * by the derived actor. clientActor may be null when
221 * the RaftActor is behaving as a follower or during
223 * @param identifier The identifier of the persisted data. This is also
224 * the same identifier that was passed to persistData by
225 * the derived actor. identifier may be null when
226 * the RaftActor is behaving as a follower or during
228 * @param data A piece of data that was persisted by the persistData call.
229 * This should NEVER be null.
231 protected abstract void applyState(ActorRef clientActor, String identifier,
235 * This method will be called by the RaftActor when a snapshot needs to be
236 * created. The derived actor should respond with its current state.
238 * During recovery the state that is returned by the derived actor will
239 * be passed back to it by calling the applySnapshot method
241 * @return The current state of the actor
243 protected abstract Object createSnapshot();
246 * This method will be called by the RaftActor during recovery to
247 * reconstruct the state of the actor.
249 * This method may also be called at any other point during normal
250 * operations when the derived actor is out of sync with it's peers
251 * and the only way to bring it in sync is by applying a snapshot
253 * @param snapshot A snapshot of the state of the actor
255 protected abstract void applySnapshot(Object snapshot);
257 private RaftActorBehavior switchBehavior(RaftState state) {
258 if (currentBehavior != null) {
259 if (currentBehavior.state() == state) {
260 return currentBehavior;
262 LOG.info("Switching from state " + currentBehavior.state() + " to "
266 currentBehavior.close();
267 } catch (Exception e) {
269 "Failed to close behavior : " + currentBehavior.state());
273 LOG.info("Switching behavior to " + state);
275 RaftActorBehavior behavior = null;
276 if (state == RaftState.Candidate) {
277 behavior = new Candidate(context);
278 } else if (state == RaftState.Follower) {
279 behavior = new Follower(context);
281 behavior = new Leader(context);
286 private void trimPersistentData(long sequenceNumber) {
288 // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
289 // For now guessing that it is ANDed.
290 deleteSnapshots(new SnapshotSelectionCriteria(
291 sequenceNumber - 100000, 43200000));
294 deleteMessages(sequenceNumber);
297 private class ReplicatedLogImpl implements ReplicatedLog {
298 private final List<ReplicatedLogEntry> journal;
299 private final Object snapshot;
300 private long snapshotIndex = -1;
301 private long snapshotTerm = -1;
303 public ReplicatedLogImpl(Snapshot snapshot) {
304 this.snapshot = snapshot.getState();
305 this.snapshotIndex = snapshot.getLastAppliedIndex();
306 this.snapshotTerm = snapshot.getLastAppliedTerm();
308 this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
311 public ReplicatedLogImpl() {
312 this.snapshot = null;
313 this.journal = new ArrayList<>();
316 @Override public ReplicatedLogEntry get(long index) {
317 int adjustedIndex = adjustedIndex(index);
319 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
323 return journal.get(adjustedIndex);
326 @Override public ReplicatedLogEntry last() {
327 if (journal.size() == 0) {
330 return get(journal.size() - 1);
333 @Override public long lastIndex() {
334 if (journal.size() == 0) {
338 return last().getIndex();
341 @Override public long lastTerm() {
342 if (journal.size() == 0) {
346 return last().getTerm();
350 @Override public void removeFrom(long index) {
351 int adjustedIndex = adjustedIndex(index);
353 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
356 for (int i = adjustedIndex;
357 i < journal.size(); i++) {
363 @Override public void append(
364 final ReplicatedLogEntry replicatedLogEntry) {
365 journal.add(replicatedLogEntry);
368 @Override public List<ReplicatedLogEntry> getFrom(long index) {
369 int adjustedIndex = adjustedIndex(index);
371 List<ReplicatedLogEntry> entries = new ArrayList<>(100);
372 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
375 for (int i = adjustedIndex;
376 i < journal.size(); i++) {
377 entries.add(journal.get(i));
382 @Override public void appendAndPersist(
383 final ReplicatedLogEntry replicatedLogEntry) {
384 appendAndPersist(null, null, replicatedLogEntry);
387 public void appendAndPersist(final ActorRef clientActor,
388 final String identifier,
389 final ReplicatedLogEntry replicatedLogEntry) {
390 context.getLogger().debug(
391 "Append log entry and persist " + replicatedLogEntry.getIndex());
392 // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
393 journal.add(replicatedLogEntry);
395 // When persisting events with persist it is guaranteed that the
396 // persistent actor will not receive further commands between the
397 // persist call and the execution(s) of the associated event
398 // handler. This also holds for multiple persist calls in context
399 // of a single command.
400 persist(replicatedLogEntry,
401 new Procedure<ReplicatedLogEntry>() {
402 public void apply(ReplicatedLogEntry evt) throws Exception {
403 // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
404 if (size() > 100000) {
405 ReplicatedLogEntry lastAppliedEntry =
406 get(context.getLastApplied());
407 long lastAppliedIndex = -1;
408 long lastAppliedTerm = -1;
409 if (lastAppliedEntry != null) {
410 lastAppliedIndex = lastAppliedEntry.getIndex();
411 lastAppliedTerm = lastAppliedEntry.getTerm();
414 saveSnapshot(Snapshot.create(createSnapshot(),
415 getFrom(context.getLastApplied() + 1),
416 lastIndex(), lastTerm(), lastAppliedIndex,
419 // Send message for replication
420 if (clientActor != null) {
421 currentBehavior.handleMessage(getSelf(),
422 new Replicate(clientActor, identifier,
431 @Override public long size() {
432 return journal.size() + snapshotIndex;
435 @Override public boolean isPresent(long index) {
436 int adjustedIndex = adjustedIndex(index);
438 if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
444 @Override public boolean isInSnapshot(long index) {
445 return index <= snapshotIndex;
448 @Override public Object getSnapshot() {
452 @Override public long getSnapshotIndex() {
453 return snapshotIndex;
456 @Override public long getSnapshotTerm() {
460 private int adjustedIndex(long index) {
461 if(snapshotIndex < 0){
464 return (int) (index - snapshotIndex);
469 private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
472 private final long index;
473 private final long term;
474 private final Object payload;
476 public ReplicatedLogImplEntry(long index, long term, Object payload) {
480 this.payload = payload;
483 @Override public Object getData() {
487 @Override public long getTerm() {
491 @Override public long getIndex() {
497 private static class Snapshot implements Serializable {
498 private final Object state;
499 private final List<ReplicatedLogEntry> unAppliedEntries;
500 private final long lastIndex;
501 private final long lastTerm;
502 private final long lastAppliedIndex;
503 private final long lastAppliedTerm;
505 private Snapshot(Object state,
506 List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
507 long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
509 this.unAppliedEntries = unAppliedEntries;
510 this.lastIndex = lastIndex;
511 this.lastTerm = lastTerm;
512 this.lastAppliedIndex = lastAppliedIndex;
513 this.lastAppliedTerm = lastAppliedTerm;
517 public static Snapshot create(Object state,
518 List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
519 long lastAppliedIndex, long lastAppliedTerm) {
520 return new Snapshot(state, entries, lastIndex, lastTerm,
521 lastAppliedIndex, lastAppliedTerm);
524 public Object getState() {
528 public List<ReplicatedLogEntry> getUnAppliedEntries() {
529 return unAppliedEntries;
532 public long getLastTerm() {
536 public long getLastAppliedIndex() {
537 return lastAppliedIndex;
540 public long getLastAppliedTerm() {
541 return lastAppliedTerm;