9b7940838dd0b22980a8002a47d501a2327a7e1a
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.SaveSnapshotFailure;
18 import akka.persistence.SaveSnapshotSuccess;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.persistence.UntypedPersistentActor;
22 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
23 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
24 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
25 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
26 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
28 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
29 import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
31 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
33
34 import java.io.Serializable;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38
39 /**
40  * RaftActor encapsulates a state machine that needs to be kept synchronized
41  * in a cluster. It implements the RAFT algorithm as described in the paper
42  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
43  * In Search of an Understandable Consensus Algorithm</a>
44  * <p/>
45  * RaftActor has 3 states and each state has a certain behavior associated
46  * with it. A Raft actor can behave as,
47  * <ul>
48  * <li> A Leader </li>
49  * <li> A Follower (or) </li>
50  * <li> A Candidate </li>
51  * </ul>
52  * <p/>
53  * <p/>
54  * A RaftActor MUST be a Leader in order to accept requests from clients to
55  * change the state of it's encapsulated state machine. Once a RaftActor becomes
56  * a Leader it is also responsible for ensuring that all followers ultimately
57  * have the same log and therefore the same state machine as itself.
58  * <p/>
59  * <p/>
60  * The current behavior of a RaftActor determines how election for leadership
61  * is initiated and how peer RaftActors react to request for votes.
62  * <p/>
63  * <p/>
64  * Each RaftActor also needs to know the current election term. It uses this
65  * information for a couple of things. One is to simply figure out who it
66  * voted for in the last election. Another is to figure out if the message
67  * it received to update it's state is stale.
68  * <p/>
69  * <p/>
70  * The RaftActor uses akka-persistence to store it's replicated log.
71  * Furthermore through it's behaviors a Raft Actor determines
72  * <p/>
73  * <ul>
74  * <li> when a log entry should be persisted </li>
75  * <li> when a log entry should be applied to the state machine (and) </li>
76  * <li> when a snapshot should be saved </li>
77  * </ul>
78  */
79 public abstract class RaftActor extends UntypedPersistentActor {
80     protected final LoggingAdapter LOG =
81         Logging.getLogger(getContext().system(), this);
82
83     /**
84      * The current state determines the current behavior of a RaftActor
85      * A Raft Actor always starts off in the Follower State
86      */
87     private RaftActorBehavior currentBehavior;
88
89     /**
90      * This context should NOT be passed directly to any other actor it is
91      * only to be consumed by the RaftActorBehaviors
92      */
93     private RaftActorContext context;
94
95     /**
96      * The in-memory journal
97      */
98     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
99
100
101     public RaftActor(String id, Map<String, String> peerAddresses) {
102         context = new RaftActorContextImpl(this.getSelf(),
103             this.getContext(),
104             id, new ElectionTermImpl(getSelf().path().toString()),
105             -1, -1, replicatedLog, peerAddresses, LOG);
106     }
107
108     @Override public void onReceiveRecover(Object message) {
109         if (message instanceof SnapshotOffer) {
110             SnapshotOffer offer = (SnapshotOffer) message;
111             Snapshot snapshot = (Snapshot) offer.snapshot();
112
113             // Create a replicated log with the snapshot information
114             // The replicated log can be used later on to retrieve this snapshot
115             // when we need to install it on a peer
116             replicatedLog = new ReplicatedLogImpl(snapshot);
117
118             // Apply the snapshot to the actors state
119             applySnapshot(snapshot.getState());
120
121         } else if (message instanceof ReplicatedLogEntry) {
122             replicatedLog.append((ReplicatedLogEntry) message);
123         } else if (message instanceof RecoveryCompleted) {
124             LOG.debug(
125                 "Last index in log : " + replicatedLog.lastIndex());
126             currentBehavior = switchBehavior(RaftState.Follower);
127         }
128     }
129
130     @Override public void onReceiveCommand(Object message) {
131         if (message instanceof ApplyState){
132             ApplyState applyState = (ApplyState) message;
133
134             LOG.debug("Applying state for log index {} data {}",
135                 applyState.getReplicatedLogEntry().getIndex(),
136                 applyState.getReplicatedLogEntry().getData());
137
138             applyState(applyState.getClientActor(), applyState.getIdentifier(),
139                 applyState.getReplicatedLogEntry().getData());
140         } else if(message instanceof ApplySnapshot ) {
141             applySnapshot(((ApplySnapshot) message).getSnapshot());
142         } else if (message instanceof FindLeader) {
143             getSender().tell(
144                 new FindLeaderReply(
145                     context.getPeerAddress(currentBehavior.getLeaderId())),
146                 getSelf()
147             );
148         } else if (message instanceof SaveSnapshotSuccess) {
149             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
150
151             // TODO: Not sure if we want to be this aggressive with trimming stuff
152             trimPersistentData(success.metadata().sequenceNr());
153
154         } else if (message instanceof SaveSnapshotFailure) {
155             // TODO: Handle failure in saving the snapshot
156         } else if (message instanceof FindLeader){
157             getSender().tell(new FindLeaderReply(
158                 context.getPeerAddress(currentBehavior.getLeaderId())),
159                 getSelf());
160
161         } else if (message instanceof AddRaftPeer){
162             AddRaftPeer arp = (AddRaftPeer)message;
163            context.addToPeers(arp.getName(), arp.getAddress());
164
165         } else if (message instanceof RemoveRaftPeer){
166             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
167             context.removePeer(rrp.getName());
168         } else {
169             RaftState state =
170                 currentBehavior.handleMessage(getSender(), message);
171             currentBehavior = switchBehavior(state);
172         }
173     }
174
175
176
177     /**
178      * When a derived RaftActor needs to persist something it must call
179      * persistData.
180      *
181      * @param clientActor
182      * @param identifier
183      * @param data
184      */
185     protected void persistData(ActorRef clientActor, String identifier,
186         Object data) {
187
188         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
189             context.getReplicatedLog().lastIndex() + 1,
190             context.getTermInformation().getCurrentTerm(), data);
191
192         LOG.debug("Persist data {}", replicatedLogEntry);
193
194         replicatedLog
195             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
196     }
197
198     protected String getId() {
199         return context.getId();
200     }
201
202     /**
203      * Derived actors can call the isLeader method to check if the current
204      * RaftActor is the Leader or not
205      *
206      * @return true it this RaftActor is a Leader false otherwise
207      */
208     protected boolean isLeader() {
209         return context.getId().equals(currentBehavior.getLeaderId());
210     }
211
212     /**
213      * Derived actor can call getLeader if they need a reference to the Leader.
214      * This would be useful for example in forwarding a request to an actor
215      * which is the leader
216      *
217      * @return A reference to the leader if known, null otherwise
218      */
219     protected ActorSelection getLeader(){
220         String leaderId = currentBehavior.getLeaderId();
221         if (leaderId == null) {
222             return null;
223         }
224         String peerAddress = context.getPeerAddress(leaderId);
225         LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
226             + peerAddress);
227         return context.actorSelection(peerAddress);
228     }
229
230     protected RaftState getRaftState() {
231         return currentBehavior.state();
232     }
233
234
235
236     /**
237      * The applyState method will be called by the RaftActor when some data
238      * needs to be applied to the actor's state
239      *
240      * @param clientActor A reference to the client who sent this message. This
241      *                    is the same reference that was passed to persistData
242      *                    by the derived actor. clientActor may be null when
243      *                    the RaftActor is behaving as a follower or during
244      *                    recovery.
245      * @param identifier  The identifier of the persisted data. This is also
246      *                    the same identifier that was passed to persistData by
247      *                    the derived actor. identifier may be null when
248      *                    the RaftActor is behaving as a follower or during
249      *                    recovery
250      * @param data        A piece of data that was persisted by the persistData call.
251      *                    This should NEVER be null.
252      */
253     protected abstract void applyState(ActorRef clientActor, String identifier,
254         Object data);
255
256     /**
257      * This method will be called by the RaftActor when a snapshot needs to be
258      * created. The derived actor should respond with its current state.
259      * <p/>
260      * During recovery the state that is returned by the derived actor will
261      * be passed back to it by calling the applySnapshot  method
262      *
263      * @return The current state of the actor
264      */
265     protected abstract Object createSnapshot();
266
267     /**
268      * This method will be called by the RaftActor during recovery to
269      * reconstruct the state of the actor.
270      * <p/>
271      * This method may also be called at any other point during normal
272      * operations when the derived actor is out of sync with it's peers
273      * and the only way to bring it in sync is by applying a snapshot
274      *
275      * @param snapshot A snapshot of the state of the actor
276      */
277     protected abstract void applySnapshot(Object snapshot);
278
279     private RaftActorBehavior switchBehavior(RaftState state) {
280         if (currentBehavior != null) {
281             if (currentBehavior.state() == state) {
282                 return currentBehavior;
283             }
284             LOG.info("Switching from state " + currentBehavior.state() + " to "
285                 + state);
286
287             try {
288                 currentBehavior.close();
289             } catch (Exception e) {
290                 LOG.error(e,
291                     "Failed to close behavior : " + currentBehavior.state());
292             }
293
294         } else {
295             LOG.info("Switching behavior to " + state);
296         }
297         RaftActorBehavior behavior = null;
298         if (state == RaftState.Candidate) {
299             behavior = new Candidate(context);
300         } else if (state == RaftState.Follower) {
301             behavior = new Follower(context);
302         } else {
303             behavior = new Leader(context);
304         }
305         return behavior;
306     }
307
308     private void trimPersistentData(long sequenceNumber) {
309         // Trim snapshots
310         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
311         // For now guessing that it is ANDed.
312         deleteSnapshots(new SnapshotSelectionCriteria(
313             sequenceNumber - 100000, 43200000));
314
315         // Trim journal
316         deleteMessages(sequenceNumber);
317     }
318
319     private class ReplicatedLogImpl implements ReplicatedLog {
320         private final List<ReplicatedLogEntry> journal;
321         private final Object snapshot;
322         private long snapshotIndex = -1;
323         private long snapshotTerm = -1;
324
325         public ReplicatedLogImpl(Snapshot snapshot) {
326             this.snapshot = snapshot.getState();
327             this.snapshotIndex = snapshot.getLastAppliedIndex();
328             this.snapshotTerm = snapshot.getLastAppliedTerm();
329
330             this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
331         }
332
333         public ReplicatedLogImpl() {
334             this.snapshot = null;
335             this.journal = new ArrayList<>();
336         }
337
338         @Override public ReplicatedLogEntry get(long index) {
339             int adjustedIndex = adjustedIndex(index);
340
341             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
342                 return null;
343             }
344
345             return journal.get(adjustedIndex);
346         }
347
348         @Override public ReplicatedLogEntry last() {
349             if (journal.size() == 0) {
350                 return null;
351             }
352             return get(journal.size() - 1);
353         }
354
355         @Override public long lastIndex() {
356             if (journal.size() == 0) {
357                 return -1;
358             }
359
360             return last().getIndex();
361         }
362
363         @Override public long lastTerm() {
364             if (journal.size() == 0) {
365                 return -1;
366             }
367
368             return last().getTerm();
369         }
370
371
372         @Override public void removeFrom(long index) {
373             int adjustedIndex = adjustedIndex(index);
374
375             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
376                 return;
377             }
378             for (int i = adjustedIndex;
379                  i < journal.size(); i++) {
380                 deleteMessage(i);
381                 journal.remove(i);
382             }
383         }
384
385         @Override public void append(
386             final ReplicatedLogEntry replicatedLogEntry) {
387             journal.add(replicatedLogEntry);
388         }
389
390         @Override public List<ReplicatedLogEntry> getFrom(long index) {
391             int adjustedIndex = adjustedIndex(index);
392
393             List<ReplicatedLogEntry> entries = new ArrayList<>(100);
394             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
395                 return entries;
396             }
397             for (int i = adjustedIndex;
398                  i < journal.size(); i++) {
399                 entries.add(journal.get(i));
400             }
401             return entries;
402         }
403
404         @Override public void appendAndPersist(
405             final ReplicatedLogEntry replicatedLogEntry) {
406             appendAndPersist(null, null, replicatedLogEntry);
407         }
408
409         public void appendAndPersist(final ActorRef clientActor,
410             final String identifier,
411             final ReplicatedLogEntry replicatedLogEntry) {
412             context.getLogger().debug(
413                 "Append log entry and persist " + replicatedLogEntry);
414             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
415             journal.add(replicatedLogEntry);
416
417             // When persisting events with persist it is guaranteed that the
418             // persistent actor will not receive further commands between the
419             // persist call and the execution(s) of the associated event
420             // handler. This also holds for multiple persist calls in context
421             // of a single command.
422             persist(replicatedLogEntry,
423                 new Procedure<ReplicatedLogEntry>() {
424                     public void apply(ReplicatedLogEntry evt) throws Exception {
425                         // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
426                         if (size() > 100000) {
427                             ReplicatedLogEntry lastAppliedEntry =
428                                 get(context.getLastApplied());
429                             long lastAppliedIndex = -1;
430                             long lastAppliedTerm = -1;
431                             if (lastAppliedEntry != null) {
432                                 lastAppliedIndex = lastAppliedEntry.getIndex();
433                                 lastAppliedTerm = lastAppliedEntry.getTerm();
434                             }
435
436                             saveSnapshot(Snapshot.create(createSnapshot(),
437                                 getFrom(context.getLastApplied() + 1),
438                                 lastIndex(), lastTerm(), lastAppliedIndex,
439                                 lastAppliedTerm));
440                         }
441                         // Send message for replication
442                         if (clientActor != null) {
443                             currentBehavior.handleMessage(getSelf(),
444                                 new Replicate(clientActor, identifier,
445                                     replicatedLogEntry)
446                             );
447                         }
448                     }
449                 }
450             );
451         }
452
453         @Override public long size() {
454             return journal.size() + snapshotIndex + 1;
455         }
456
457         @Override public boolean isPresent(long index) {
458             int adjustedIndex = adjustedIndex(index);
459
460             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
461                 return false;
462             }
463             return true;
464         }
465
466         @Override public boolean isInSnapshot(long index) {
467             return index <= snapshotIndex;
468         }
469
470         @Override public Object getSnapshot() {
471             return snapshot;
472         }
473
474         @Override public long getSnapshotIndex() {
475             return snapshotIndex;
476         }
477
478         @Override public long getSnapshotTerm() {
479             return snapshotTerm;
480         }
481
482         private int adjustedIndex(long index) {
483             if(snapshotIndex < 0){
484                 return (int) index;
485             }
486             return (int) (index - snapshotIndex);
487         }
488     }
489
490
491     private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
492         Serializable {
493
494         private final long index;
495         private final long term;
496         private final Object payload;
497
498         public ReplicatedLogImplEntry(long index, long term, Object payload) {
499
500             this.index = index;
501             this.term = term;
502             this.payload = payload;
503         }
504
505         @Override public Object getData() {
506             return payload;
507         }
508
509         @Override public long getTerm() {
510             return term;
511         }
512
513         @Override public long getIndex() {
514             return index;
515         }
516
517         @Override public String toString() {
518             return "Entry{" +
519                 "index=" + index +
520                 ", term=" + term +
521                 '}';
522         }
523     }
524
525
526     private static class Snapshot implements Serializable {
527         private final Object state;
528         private final List<ReplicatedLogEntry> unAppliedEntries;
529         private final long lastIndex;
530         private final long lastTerm;
531         private final long lastAppliedIndex;
532         private final long lastAppliedTerm;
533
534         private Snapshot(Object state,
535             List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
536             long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
537             this.state = state;
538             this.unAppliedEntries = unAppliedEntries;
539             this.lastIndex = lastIndex;
540             this.lastTerm = lastTerm;
541             this.lastAppliedIndex = lastAppliedIndex;
542             this.lastAppliedTerm = lastAppliedTerm;
543         }
544
545
546         public static Snapshot create(Object state,
547             List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
548             long lastAppliedIndex, long lastAppliedTerm) {
549             return new Snapshot(state, entries, lastIndex, lastTerm,
550                 lastAppliedIndex, lastAppliedTerm);
551         }
552
553         public Object getState() {
554             return state;
555         }
556
557         public List<ReplicatedLogEntry> getUnAppliedEntries() {
558             return unAppliedEntries;
559         }
560
561         public long getLastTerm() {
562             return lastTerm;
563         }
564
565         public long getLastAppliedIndex() {
566             return lastAppliedIndex;
567         }
568
569         public long getLastAppliedTerm() {
570             return lastAppliedTerm;
571         }
572     }
573
574
575 }