Initial implementation of saving and installing snapshots
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.SaveSnapshotFailure;
18 import akka.persistence.SaveSnapshotSuccess;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.persistence.UntypedPersistentActor;
22 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
23 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
24 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
25 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
26 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
28 import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
29 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
30 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
31
32 import java.io.Serializable;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Map;
36
37 /**
38  * RaftActor encapsulates a state machine that needs to be kept synchronized
39  * in a cluster. It implements the RAFT algorithm as described in the paper
40  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
41  * In Search of an Understandable Consensus Algorithm</a>
42  * <p/>
43  * RaftActor has 3 states and each state has a certain behavior associated
44  * with it. A Raft actor can behave as,
45  * <ul>
46  * <li> A Leader </li>
47  * <li> A Follower (or) </li>
48  * <li> A Candidate </li>
49  * </ul>
50  * <p/>
51  * <p/>
52  * A RaftActor MUST be a Leader in order to accept requests from clients to
53  * change the state of it's encapsulated state machine. Once a RaftActor becomes
54  * a Leader it is also responsible for ensuring that all followers ultimately
55  * have the same log and therefore the same state machine as itself.
56  * <p/>
57  * <p/>
58  * The current behavior of a RaftActor determines how election for leadership
59  * is initiated and how peer RaftActors react to request for votes.
60  * <p/>
61  * <p/>
62  * Each RaftActor also needs to know the current election term. It uses this
63  * information for a couple of things. One is to simply figure out who it
64  * voted for in the last election. Another is to figure out if the message
65  * it received to update it's state is stale.
66  * <p/>
67  * <p/>
68  * The RaftActor uses akka-persistence to store it's replicated log.
69  * Furthermore through it's behaviors a Raft Actor determines
70  * <p/>
71  * <ul>
72  * <li> when a log entry should be persisted </li>
73  * <li> when a log entry should be applied to the state machine (and) </li>
74  * <li> when a snapshot should be saved </li>
75  * </ul>
76  */
77 public abstract class RaftActor extends UntypedPersistentActor {
78     protected final LoggingAdapter LOG =
79         Logging.getLogger(getContext().system(), this);
80
81     /**
82      * The current state determines the current behavior of a RaftActor
83      * A Raft Actor always starts off in the Follower State
84      */
85     private RaftActorBehavior currentBehavior;
86
87     /**
88      * This context should NOT be passed directly to any other actor it is
89      * only to be consumed by the RaftActorBehaviors
90      */
91     private RaftActorContext context;
92
93     /**
94      * The in-memory journal
95      */
96     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
97
98
99     public RaftActor(String id, Map<String, String> peerAddresses) {
100         context = new RaftActorContextImpl(this.getSelf(),
101             this.getContext(),
102             id, new ElectionTermImpl(getSelf().path().toString()),
103             -1, -1, replicatedLog, peerAddresses, LOG);
104     }
105
106     @Override public void onReceiveRecover(Object message) {
107         if (message instanceof SnapshotOffer) {
108             SnapshotOffer offer = (SnapshotOffer) message;
109             Snapshot snapshot = (Snapshot) offer.snapshot();
110
111             // Create a replicated log with the snapshot information
112             // The replicated log can be used later on to retrieve this snapshot
113             // when we need to install it on a peer
114             replicatedLog = new ReplicatedLogImpl(snapshot);
115
116             // Apply the snapshot to the actors state
117             applySnapshot(snapshot.getState());
118
119         } else if (message instanceof ReplicatedLogEntry) {
120             replicatedLog.append((ReplicatedLogEntry) message);
121         } else if (message instanceof RecoveryCompleted) {
122             LOG.debug(
123                 "Last index in log : " + replicatedLog.lastIndex());
124             currentBehavior = switchBehavior(RaftState.Follower);
125         }
126     }
127
128     @Override public void onReceiveCommand(Object message) {
129         if (message instanceof ApplyState) {
130
131             ApplyState applyState = (ApplyState) message;
132
133             LOG.debug("Applying state for log index {}",
134                 applyState.getReplicatedLogEntry().getIndex());
135
136             applyState(applyState.getClientActor(), applyState.getIdentifier(),
137                 applyState.getReplicatedLogEntry().getData());
138         } else if(message instanceof ApplySnapshot ) {
139             applySnapshot(((ApplySnapshot) message).getSnapshot());
140         } else if (message instanceof FindLeader) {
141             getSender().tell(
142                 new FindLeaderReply(
143                     context.getPeerAddress(currentBehavior.getLeaderId())),
144                 getSelf()
145             );
146         } else if (message instanceof SaveSnapshotSuccess) {
147             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
148
149             // TODO: Not sure if we want to be this aggressive with trimming stuff
150             trimPersistentData(success.metadata().sequenceNr());
151
152         } else if (message instanceof SaveSnapshotFailure) {
153             // TODO: Handle failure in saving the snapshot
154         } else {
155             RaftState state =
156                 currentBehavior.handleMessage(getSender(), message);
157             currentBehavior = switchBehavior(state);
158         }
159     }
160
161
162
163     /**
164      * When a derived RaftActor needs to persist something it must call
165      * persistData.
166      *
167      * @param clientActor
168      * @param identifier
169      * @param data
170      */
171     protected void persistData(ActorRef clientActor, String identifier,
172         Object data) {
173         LOG.debug("Persist data " + identifier);
174         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
175             context.getReplicatedLog().lastIndex() + 1,
176             context.getTermInformation().getCurrentTerm(), data);
177
178         replicatedLog
179             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
180     }
181
182     protected String getId() {
183         return context.getId();
184     }
185
186     /**
187      * Derived actors can call the isLeader method to check if the current
188      * RaftActor is the Leader or not
189      *
190      * @return true it this RaftActor is a Leader false otherwise
191      */
192     protected boolean isLeader() {
193         return context.getId().equals(currentBehavior.getLeaderId());
194     }
195
196     /**
197      * Derived actor can call getLeader if they need a reference to the Leader.
198      * This would be useful for example in forwarding a request to an actor
199      * which is the leader
200      *
201      * @return A reference to the leader if known, null otherwise
202      */
203     protected ActorSelection getLeader() {
204         String leaderId = currentBehavior.getLeaderId();
205         if (leaderId == null) {
206             return null;
207         }
208         String peerAddress = context.getPeerAddress(leaderId);
209         LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
210             + peerAddress);
211         return context.actorSelection(peerAddress);
212     }
213
214     /**
215      * The applyState method will be called by the RaftActor when some data
216      * needs to be applied to the actor's state
217      *
218      * @param clientActor A reference to the client who sent this message. This
219      *                    is the same reference that was passed to persistData
220      *                    by the derived actor. clientActor may be null when
221      *                    the RaftActor is behaving as a follower or during
222      *                    recovery.
223      * @param identifier  The identifier of the persisted data. This is also
224      *                    the same identifier that was passed to persistData by
225      *                    the derived actor. identifier may be null when
226      *                    the RaftActor is behaving as a follower or during
227      *                    recovery
228      * @param data        A piece of data that was persisted by the persistData call.
229      *                    This should NEVER be null.
230      */
231     protected abstract void applyState(ActorRef clientActor, String identifier,
232         Object data);
233
234     /**
235      * This method will be called by the RaftActor when a snapshot needs to be
236      * created. The derived actor should respond with its current state.
237      * <p/>
238      * During recovery the state that is returned by the derived actor will
239      * be passed back to it by calling the applySnapshot  method
240      *
241      * @return The current state of the actor
242      */
243     protected abstract Object createSnapshot();
244
245     /**
246      * This method will be called by the RaftActor during recovery to
247      * reconstruct the state of the actor.
248      * <p/>
249      * This method may also be called at any other point during normal
250      * operations when the derived actor is out of sync with it's peers
251      * and the only way to bring it in sync is by applying a snapshot
252      *
253      * @param snapshot A snapshot of the state of the actor
254      */
255     protected abstract void applySnapshot(Object snapshot);
256
257     private RaftActorBehavior switchBehavior(RaftState state) {
258         if (currentBehavior != null) {
259             if (currentBehavior.state() == state) {
260                 return currentBehavior;
261             }
262             LOG.info("Switching from state " + currentBehavior.state() + " to "
263                 + state);
264
265             try {
266                 currentBehavior.close();
267             } catch (Exception e) {
268                 LOG.error(e,
269                     "Failed to close behavior : " + currentBehavior.state());
270             }
271
272         } else {
273             LOG.info("Switching behavior to " + state);
274         }
275         RaftActorBehavior behavior = null;
276         if (state == RaftState.Candidate) {
277             behavior = new Candidate(context);
278         } else if (state == RaftState.Follower) {
279             behavior = new Follower(context);
280         } else {
281             behavior = new Leader(context);
282         }
283         return behavior;
284     }
285
286     private void trimPersistentData(long sequenceNumber) {
287         // Trim snapshots
288         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
289         // For now guessing that it is ANDed.
290         deleteSnapshots(new SnapshotSelectionCriteria(
291             sequenceNumber - 100000, 43200000));
292
293         // Trim journal
294         deleteMessages(sequenceNumber);
295     }
296
297     private class ReplicatedLogImpl implements ReplicatedLog {
298         private final List<ReplicatedLogEntry> journal;
299         private final Object snapshot;
300         private long snapshotIndex = -1;
301         private long snapshotTerm = -1;
302
303         public ReplicatedLogImpl(Snapshot snapshot) {
304             this.snapshot = snapshot.getState();
305             this.snapshotIndex = snapshot.getLastAppliedIndex();
306             this.snapshotTerm = snapshot.getLastAppliedTerm();
307
308             this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
309         }
310
311         public ReplicatedLogImpl() {
312             this.snapshot = null;
313             this.journal = new ArrayList<>();
314         }
315
316         @Override public ReplicatedLogEntry get(long index) {
317             int adjustedIndex = adjustedIndex(index);
318
319             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
320                 return null;
321             }
322
323             return journal.get(adjustedIndex);
324         }
325
326         @Override public ReplicatedLogEntry last() {
327             if (journal.size() == 0) {
328                 return null;
329             }
330             return get(journal.size() - 1);
331         }
332
333         @Override public long lastIndex() {
334             if (journal.size() == 0) {
335                 return -1;
336             }
337
338             return last().getIndex();
339         }
340
341         @Override public long lastTerm() {
342             if (journal.size() == 0) {
343                 return -1;
344             }
345
346             return last().getTerm();
347         }
348
349
350         @Override public void removeFrom(long index) {
351             int adjustedIndex = adjustedIndex(index);
352
353             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
354                 return;
355             }
356             for (int i = adjustedIndex;
357                  i < journal.size(); i++) {
358                 deleteMessage(i);
359                 journal.remove(i);
360             }
361         }
362
363         @Override public void append(
364             final ReplicatedLogEntry replicatedLogEntry) {
365             journal.add(replicatedLogEntry);
366         }
367
368         @Override public List<ReplicatedLogEntry> getFrom(long index) {
369             int adjustedIndex = adjustedIndex(index);
370
371             List<ReplicatedLogEntry> entries = new ArrayList<>(100);
372             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
373                 return entries;
374             }
375             for (int i = adjustedIndex;
376                  i < journal.size(); i++) {
377                 entries.add(journal.get(i));
378             }
379             return entries;
380         }
381
382         @Override public void appendAndPersist(
383             final ReplicatedLogEntry replicatedLogEntry) {
384             appendAndPersist(null, null, replicatedLogEntry);
385         }
386
387         public void appendAndPersist(final ActorRef clientActor,
388             final String identifier,
389             final ReplicatedLogEntry replicatedLogEntry) {
390             context.getLogger().debug(
391                 "Append log entry and persist " + replicatedLogEntry.getIndex());
392             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
393             journal.add(replicatedLogEntry);
394
395             // When persisting events with persist it is guaranteed that the
396             // persistent actor will not receive further commands between the
397             // persist call and the execution(s) of the associated event
398             // handler. This also holds for multiple persist calls in context
399             // of a single command.
400             persist(replicatedLogEntry,
401                 new Procedure<ReplicatedLogEntry>() {
402                     public void apply(ReplicatedLogEntry evt) throws Exception {
403                         // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
404                         if (size() > 100000) {
405                             ReplicatedLogEntry lastAppliedEntry =
406                                 get(context.getLastApplied());
407                             long lastAppliedIndex = -1;
408                             long lastAppliedTerm = -1;
409                             if (lastAppliedEntry != null) {
410                                 lastAppliedIndex = lastAppliedEntry.getIndex();
411                                 lastAppliedTerm = lastAppliedEntry.getTerm();
412                             }
413
414                             saveSnapshot(Snapshot.create(createSnapshot(),
415                                 getFrom(context.getLastApplied() + 1),
416                                 lastIndex(), lastTerm(), lastAppliedIndex,
417                                 lastAppliedTerm));
418                         }
419                         // Send message for replication
420                         if (clientActor != null) {
421                             currentBehavior.handleMessage(getSelf(),
422                                 new Replicate(clientActor, identifier,
423                                     replicatedLogEntry)
424                             );
425                         }
426                     }
427                 }
428             );
429         }
430
431         @Override public long size() {
432             return journal.size() + snapshotIndex;
433         }
434
435         @Override public boolean isPresent(long index) {
436             int adjustedIndex = adjustedIndex(index);
437
438             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
439                 return false;
440             }
441             return true;
442         }
443
444         @Override public boolean isInSnapshot(long index) {
445             return index <= snapshotIndex;
446         }
447
448         @Override public Object getSnapshot() {
449             return snapshot;
450         }
451
452         @Override public long getSnapshotIndex() {
453             return snapshotIndex;
454         }
455
456         @Override public long getSnapshotTerm() {
457             return snapshotTerm;
458         }
459
460         private int adjustedIndex(long index) {
461             if(snapshotIndex < 0){
462                 return (int) index;
463             }
464             return (int) (index - snapshotIndex);
465         }
466     }
467
468
469     private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
470         Serializable {
471
472         private final long index;
473         private final long term;
474         private final Object payload;
475
476         public ReplicatedLogImplEntry(long index, long term, Object payload) {
477
478             this.index = index;
479             this.term = term;
480             this.payload = payload;
481         }
482
483         @Override public Object getData() {
484             return payload;
485         }
486
487         @Override public long getTerm() {
488             return term;
489         }
490
491         @Override public long getIndex() {
492             return index;
493         }
494     }
495
496
497     private static class Snapshot implements Serializable {
498         private final Object state;
499         private final List<ReplicatedLogEntry> unAppliedEntries;
500         private final long lastIndex;
501         private final long lastTerm;
502         private final long lastAppliedIndex;
503         private final long lastAppliedTerm;
504
505         private Snapshot(Object state,
506             List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
507             long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
508             this.state = state;
509             this.unAppliedEntries = unAppliedEntries;
510             this.lastIndex = lastIndex;
511             this.lastTerm = lastTerm;
512             this.lastAppliedIndex = lastAppliedIndex;
513             this.lastAppliedTerm = lastAppliedTerm;
514         }
515
516
517         public static Snapshot create(Object state,
518             List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
519             long lastAppliedIndex, long lastAppliedTerm) {
520             return new Snapshot(state, entries, lastIndex, lastTerm,
521                 lastAppliedIndex, lastAppliedTerm);
522         }
523
524         public Object getState() {
525             return state;
526         }
527
528         public List<ReplicatedLogEntry> getUnAppliedEntries() {
529             return unAppliedEntries;
530         }
531
532         public long getLastTerm() {
533             return lastTerm;
534         }
535
536         public long getLastAppliedIndex() {
537             return lastAppliedIndex;
538         }
539
540         public long getLastAppliedTerm() {
541             return lastAppliedTerm;
542         }
543     }
544
545
546 }