Fix bug in ReplicatedLogImpl#removeFrom and use akka-persistence for removing entries
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.SaveSnapshotFailure;
18 import akka.persistence.SaveSnapshotSuccess;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.persistence.UntypedPersistentActor;
22 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
23 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
24 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
25 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
26 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
28 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
29 import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
31 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
33
34 import java.io.Serializable;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38
39 /**
40  * RaftActor encapsulates a state machine that needs to be kept synchronized
41  * in a cluster. It implements the RAFT algorithm as described in the paper
42  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
43  * In Search of an Understandable Consensus Algorithm</a>
44  * <p/>
45  * RaftActor has 3 states and each state has a certain behavior associated
46  * with it. A Raft actor can behave as,
47  * <ul>
48  * <li> A Leader </li>
49  * <li> A Follower (or) </li>
50  * <li> A Candidate </li>
51  * </ul>
52  * <p/>
53  * <p/>
54  * A RaftActor MUST be a Leader in order to accept requests from clients to
55  * change the state of it's encapsulated state machine. Once a RaftActor becomes
56  * a Leader it is also responsible for ensuring that all followers ultimately
57  * have the same log and therefore the same state machine as itself.
58  * <p/>
59  * <p/>
60  * The current behavior of a RaftActor determines how election for leadership
61  * is initiated and how peer RaftActors react to request for votes.
62  * <p/>
63  * <p/>
64  * Each RaftActor also needs to know the current election term. It uses this
65  * information for a couple of things. One is to simply figure out who it
66  * voted for in the last election. Another is to figure out if the message
67  * it received to update it's state is stale.
68  * <p/>
69  * <p/>
70  * The RaftActor uses akka-persistence to store it's replicated log.
71  * Furthermore through it's behaviors a Raft Actor determines
72  * <p/>
73  * <ul>
74  * <li> when a log entry should be persisted </li>
75  * <li> when a log entry should be applied to the state machine (and) </li>
76  * <li> when a snapshot should be saved </li>
77  * </ul>
78  */
79 public abstract class RaftActor extends UntypedPersistentActor {
80     protected final LoggingAdapter LOG =
81         Logging.getLogger(getContext().system(), this);
82
83     /**
84      * The current state determines the current behavior of a RaftActor
85      * A Raft Actor always starts off in the Follower State
86      */
87     private RaftActorBehavior currentBehavior;
88
89     /**
90      * This context should NOT be passed directly to any other actor it is
91      * only to be consumed by the RaftActorBehaviors
92      */
93     private RaftActorContext context;
94
95     /**
96      * The in-memory journal
97      */
98     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
99
100
101     public RaftActor(String id, Map<String, String> peerAddresses) {
102         final String id1 = getSelf().path().toString();
103         context = new RaftActorContextImpl(this.getSelf(),
104             this.getContext(),
105             id, new ElectionTermImpl(),
106             -1, -1, replicatedLog, peerAddresses, LOG);
107     }
108
109     @Override public void onReceiveRecover(Object message) {
110         if (message instanceof SnapshotOffer) {
111             SnapshotOffer offer = (SnapshotOffer) message;
112             Snapshot snapshot = (Snapshot) offer.snapshot();
113
114             // Create a replicated log with the snapshot information
115             // The replicated log can be used later on to retrieve this snapshot
116             // when we need to install it on a peer
117             replicatedLog = new ReplicatedLogImpl(snapshot);
118
119             // Apply the snapshot to the actors state
120             applySnapshot(snapshot.getState());
121
122         } else if (message instanceof ReplicatedLogEntry) {
123             replicatedLog.append((ReplicatedLogEntry) message);
124         } else if (message instanceof DeleteEntries) {
125             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
126         } else if (message instanceof UpdateElectionTerm) {
127             context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
128         } else if (message instanceof RecoveryCompleted) {
129             LOG.debug(
130                 "Last index in log : " + replicatedLog.lastIndex());
131             currentBehavior = switchBehavior(RaftState.Follower);
132         }
133     }
134
135     @Override public void onReceiveCommand(Object message) {
136         if (message instanceof ApplyState){
137             ApplyState applyState = (ApplyState) message;
138
139             LOG.debug("Applying state for log index {} data {}",
140                 applyState.getReplicatedLogEntry().getIndex(),
141                 applyState.getReplicatedLogEntry().getData());
142
143             applyState(applyState.getClientActor(), applyState.getIdentifier(),
144                 applyState.getReplicatedLogEntry().getData());
145         } else if(message instanceof ApplySnapshot ) {
146             applySnapshot(((ApplySnapshot) message).getSnapshot());
147         } else if (message instanceof FindLeader) {
148             getSender().tell(
149                 new FindLeaderReply(
150                     context.getPeerAddress(currentBehavior.getLeaderId())),
151                 getSelf()
152             );
153         } else if (message instanceof SaveSnapshotSuccess) {
154             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
155
156             // TODO: Not sure if we want to be this aggressive with trimming stuff
157             trimPersistentData(success.metadata().sequenceNr());
158
159         } else if (message instanceof SaveSnapshotFailure) {
160             // TODO: Handle failure in saving the snapshot
161         } else if (message instanceof FindLeader){
162             getSender().tell(new FindLeaderReply(
163                 context.getPeerAddress(currentBehavior.getLeaderId())),
164                 getSelf());
165
166         } else if (message instanceof AddRaftPeer){
167             AddRaftPeer arp = (AddRaftPeer)message;
168            context.addToPeers(arp.getName(), arp.getAddress());
169
170         } else if (message instanceof RemoveRaftPeer){
171             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
172             context.removePeer(rrp.getName());
173         } else {
174             RaftState state =
175                 currentBehavior.handleMessage(getSender(), message);
176             currentBehavior = switchBehavior(state);
177         }
178     }
179
180
181
182     /**
183      * When a derived RaftActor needs to persist something it must call
184      * persistData.
185      *
186      * @param clientActor
187      * @param identifier
188      * @param data
189      */
190     protected void persistData(ActorRef clientActor, String identifier,
191         Object data) {
192
193         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
194             context.getReplicatedLog().lastIndex() + 1,
195             context.getTermInformation().getCurrentTerm(), data);
196
197         LOG.debug("Persist data {}", replicatedLogEntry);
198
199         replicatedLog
200             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
201     }
202
203     protected String getId() {
204         return context.getId();
205     }
206
207     /**
208      * Derived actors can call the isLeader method to check if the current
209      * RaftActor is the Leader or not
210      *
211      * @return true it this RaftActor is a Leader false otherwise
212      */
213     protected boolean isLeader() {
214         return context.getId().equals(currentBehavior.getLeaderId());
215     }
216
217     /**
218      * Derived actor can call getLeader if they need a reference to the Leader.
219      * This would be useful for example in forwarding a request to an actor
220      * which is the leader
221      *
222      * @return A reference to the leader if known, null otherwise
223      */
224     protected ActorSelection getLeader(){
225         String leaderId = currentBehavior.getLeaderId();
226         if (leaderId == null) {
227             return null;
228         }
229         String peerAddress = context.getPeerAddress(leaderId);
230         LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
231             + peerAddress);
232         return context.actorSelection(peerAddress);
233     }
234
235     protected RaftState getRaftState() {
236         return currentBehavior.state();
237     }
238
239
240
241     /**
242      * The applyState method will be called by the RaftActor when some data
243      * needs to be applied to the actor's state
244      *
245      * @param clientActor A reference to the client who sent this message. This
246      *                    is the same reference that was passed to persistData
247      *                    by the derived actor. clientActor may be null when
248      *                    the RaftActor is behaving as a follower or during
249      *                    recovery.
250      * @param identifier  The identifier of the persisted data. This is also
251      *                    the same identifier that was passed to persistData by
252      *                    the derived actor. identifier may be null when
253      *                    the RaftActor is behaving as a follower or during
254      *                    recovery
255      * @param data        A piece of data that was persisted by the persistData call.
256      *                    This should NEVER be null.
257      */
258     protected abstract void applyState(ActorRef clientActor, String identifier,
259         Object data);
260
261     /**
262      * This method will be called by the RaftActor when a snapshot needs to be
263      * created. The derived actor should respond with its current state.
264      * <p/>
265      * During recovery the state that is returned by the derived actor will
266      * be passed back to it by calling the applySnapshot  method
267      *
268      * @return The current state of the actor
269      */
270     protected abstract Object createSnapshot();
271
272     /**
273      * This method will be called by the RaftActor during recovery to
274      * reconstruct the state of the actor.
275      * <p/>
276      * This method may also be called at any other point during normal
277      * operations when the derived actor is out of sync with it's peers
278      * and the only way to bring it in sync is by applying a snapshot
279      *
280      * @param snapshot A snapshot of the state of the actor
281      */
282     protected abstract void applySnapshot(Object snapshot);
283
284     private RaftActorBehavior switchBehavior(RaftState state) {
285         if (currentBehavior != null) {
286             if (currentBehavior.state() == state) {
287                 return currentBehavior;
288             }
289             LOG.info("Switching from state " + currentBehavior.state() + " to "
290                 + state);
291
292             try {
293                 currentBehavior.close();
294             } catch (Exception e) {
295                 LOG.error(e,
296                     "Failed to close behavior : " + currentBehavior.state());
297             }
298
299         } else {
300             LOG.info("Switching behavior to " + state);
301         }
302         RaftActorBehavior behavior = null;
303         if (state == RaftState.Candidate) {
304             behavior = new Candidate(context);
305         } else if (state == RaftState.Follower) {
306             behavior = new Follower(context);
307         } else {
308             behavior = new Leader(context);
309         }
310         return behavior;
311     }
312
313     private void trimPersistentData(long sequenceNumber) {
314         // Trim snapshots
315         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
316         // For now guessing that it is ANDed.
317         deleteSnapshots(new SnapshotSelectionCriteria(
318             sequenceNumber - 100000, 43200000));
319
320         // Trim journal
321         deleteMessages(sequenceNumber);
322     }
323
324
325     private class ReplicatedLogImpl implements ReplicatedLog {
326         private final List<ReplicatedLogEntry> journal;
327         private final Object snapshot;
328         private long snapshotIndex = -1;
329         private long snapshotTerm = -1;
330
331         public ReplicatedLogImpl(Snapshot snapshot) {
332             this.snapshot = snapshot.getState();
333             this.snapshotIndex = snapshot.getLastAppliedIndex();
334             this.snapshotTerm = snapshot.getLastAppliedTerm();
335
336             this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
337         }
338
339         public ReplicatedLogImpl() {
340             this.snapshot = null;
341             this.journal = new ArrayList<>();
342         }
343
344         @Override public ReplicatedLogEntry get(long index) {
345             int adjustedIndex = adjustedIndex(index);
346
347             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
348                 return null;
349             }
350
351             return journal.get(adjustedIndex);
352         }
353
354         @Override public ReplicatedLogEntry last() {
355             if (journal.size() == 0) {
356                 return null;
357             }
358             return get(journal.size() - 1);
359         }
360
361         @Override public long lastIndex() {
362             if (journal.size() == 0) {
363                 return -1;
364             }
365
366             return last().getIndex();
367         }
368
369         @Override public long lastTerm() {
370             if (journal.size() == 0) {
371                 return -1;
372             }
373
374             return last().getTerm();
375         }
376
377
378         @Override public void removeFrom(long index) {
379             int adjustedIndex = adjustedIndex(index);
380
381             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
382                 return;
383             }
384
385             journal.subList(adjustedIndex , journal.size()).clear();
386         }
387
388
389         @Override public void removeFromAndPersist(long index) {
390             int adjustedIndex = adjustedIndex(index);
391
392             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
393                 return;
394             }
395
396             // FIXME: Maybe this should be done after the command is saved
397             journal.subList(adjustedIndex , journal.size()).clear();
398
399             persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
400
401                 @Override public void apply(DeleteEntries param)
402                     throws Exception {
403                     //FIXME : Doing nothing for now
404                 }
405             });
406
407
408         }
409
410         @Override public void append(
411             final ReplicatedLogEntry replicatedLogEntry) {
412             journal.add(replicatedLogEntry);
413         }
414
415         @Override public List<ReplicatedLogEntry> getFrom(long index) {
416             int adjustedIndex = adjustedIndex(index);
417
418             List<ReplicatedLogEntry> entries = new ArrayList<>(100);
419             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
420                 return entries;
421             }
422
423
424             for (int i = adjustedIndex;
425                  i < journal.size(); i++) {
426                 entries.add(journal.get(i));
427             }
428             return entries;
429         }
430
431         @Override public void appendAndPersist(
432             final ReplicatedLogEntry replicatedLogEntry) {
433             appendAndPersist(null, null, replicatedLogEntry);
434         }
435
436         public void appendAndPersist(final ActorRef clientActor,
437             final String identifier,
438             final ReplicatedLogEntry replicatedLogEntry) {
439             context.getLogger().debug(
440                 "Append log entry and persist " + replicatedLogEntry);
441             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
442             journal.add(replicatedLogEntry);
443
444             // When persisting events with persist it is guaranteed that the
445             // persistent actor will not receive further commands between the
446             // persist call and the execution(s) of the associated event
447             // handler. This also holds for multiple persist calls in context
448             // of a single command.
449             persist(replicatedLogEntry,
450                 new Procedure<ReplicatedLogEntry>() {
451                     public void apply(ReplicatedLogEntry evt) throws Exception {
452                         // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
453                         if (size() > 100000) {
454                             ReplicatedLogEntry lastAppliedEntry =
455                                 get(context.getLastApplied());
456                             long lastAppliedIndex = -1;
457                             long lastAppliedTerm = -1;
458                             if (lastAppliedEntry != null) {
459                                 lastAppliedIndex = lastAppliedEntry.getIndex();
460                                 lastAppliedTerm = lastAppliedEntry.getTerm();
461                             }
462
463                             saveSnapshot(Snapshot.create(createSnapshot(),
464                                 getFrom(context.getLastApplied() + 1),
465                                 lastIndex(), lastTerm(), lastAppliedIndex,
466                                 lastAppliedTerm));
467                         }
468                         // Send message for replication
469                         if (clientActor != null) {
470                             currentBehavior.handleMessage(getSelf(),
471                                 new Replicate(clientActor, identifier,
472                                     replicatedLogEntry)
473                             );
474                         }
475                     }
476                 }
477             );
478         }
479
480         @Override public long size() {
481             return journal.size() + snapshotIndex + 1;
482         }
483
484         @Override public boolean isPresent(long index) {
485             int adjustedIndex = adjustedIndex(index);
486
487             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
488                 return false;
489             }
490             return true;
491         }
492
493         @Override public boolean isInSnapshot(long index) {
494             return index <= snapshotIndex;
495         }
496
497         @Override public Object getSnapshot() {
498             return snapshot;
499         }
500
501         @Override public long getSnapshotIndex() {
502             return snapshotIndex;
503         }
504
505         @Override public long getSnapshotTerm() {
506             return snapshotTerm;
507         }
508
509         private int adjustedIndex(long index) {
510             if(snapshotIndex < 0){
511                 return (int) index;
512             }
513             return (int) (index - snapshotIndex);
514         }
515     }
516
517
518     private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
519         Serializable {
520
521         private final long index;
522         private final long term;
523         private final Object payload;
524
525         public ReplicatedLogImplEntry(long index, long term, Object payload) {
526
527             this.index = index;
528             this.term = term;
529             this.payload = payload;
530         }
531
532         @Override public Object getData() {
533             return payload;
534         }
535
536         @Override public long getTerm() {
537             return term;
538         }
539
540         @Override public long getIndex() {
541             return index;
542         }
543
544         @Override public String toString() {
545             return "Entry{" +
546                 "index=" + index +
547                 ", term=" + term +
548                 '}';
549         }
550     }
551
552     private static class DeleteEntries implements Serializable {
553         private final int fromIndex;
554
555
556         public DeleteEntries(int fromIndex) {
557             this.fromIndex = fromIndex;
558         }
559
560         public int getFromIndex() {
561             return fromIndex;
562         }
563     }
564
565
566     private static class Snapshot implements Serializable {
567         private final Object state;
568         private final List<ReplicatedLogEntry> unAppliedEntries;
569         private final long lastIndex;
570         private final long lastTerm;
571         private final long lastAppliedIndex;
572         private final long lastAppliedTerm;
573
574         private Snapshot(Object state,
575             List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
576             long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
577             this.state = state;
578             this.unAppliedEntries = unAppliedEntries;
579             this.lastIndex = lastIndex;
580             this.lastTerm = lastTerm;
581             this.lastAppliedIndex = lastAppliedIndex;
582             this.lastAppliedTerm = lastAppliedTerm;
583         }
584
585
586         public static Snapshot create(Object state,
587             List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
588             long lastAppliedIndex, long lastAppliedTerm) {
589             return new Snapshot(state, entries, lastIndex, lastTerm,
590                 lastAppliedIndex, lastAppliedTerm);
591         }
592
593         public Object getState() {
594             return state;
595         }
596
597         public List<ReplicatedLogEntry> getUnAppliedEntries() {
598             return unAppliedEntries;
599         }
600
601         public long getLastTerm() {
602             return lastTerm;
603         }
604
605         public long getLastAppliedIndex() {
606             return lastAppliedIndex;
607         }
608
609         public long getLastAppliedTerm() {
610             return lastAppliedTerm;
611         }
612     }
613
614     private class ElectionTermImpl implements ElectionTerm {
615         /**
616          * Identifier of the actor whose election term information this is
617          */
618         private long currentTerm = 0;
619         private String votedFor = null;
620
621         public long getCurrentTerm() {
622             return currentTerm;
623         }
624
625         public String getVotedFor() {
626             return votedFor;
627         }
628
629         @Override public void update(long currentTerm, String votedFor) {
630             LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
631
632             this.currentTerm = currentTerm;
633             this.votedFor = votedFor;
634         }
635
636         @Override
637         public void updateAndPersist(long currentTerm, String votedFor){
638             update(currentTerm, votedFor);
639             // FIXME : Maybe first persist then update the state
640             persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
641
642                 @Override public void apply(UpdateElectionTerm param)
643                     throws Exception {
644
645                 }
646             });
647         }
648     }
649
650     private static class UpdateElectionTerm implements Serializable {
651         private final long currentTerm;
652         private final String votedFor;
653
654         public UpdateElectionTerm(long currentTerm, String votedFor) {
655             this.currentTerm = currentTerm;
656             this.votedFor = votedFor;
657         }
658
659         public long getCurrentTerm() {
660             return currentTerm;
661         }
662
663         public String getVotedFor() {
664             return votedFor;
665         }
666     }
667
668 }