Merge "Improve RESTCONF thread safety when reconfiguring"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.SaveSnapshotFailure;
18 import akka.persistence.SaveSnapshotSuccess;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.persistence.UntypedPersistentActor;
22 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
23 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
24 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
25 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
26 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
28 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
29 import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
31 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
33
34 import java.io.Serializable;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38
39 /**
40  * RaftActor encapsulates a state machine that needs to be kept synchronized
41  * in a cluster. It implements the RAFT algorithm as described in the paper
42  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
43  * In Search of an Understandable Consensus Algorithm</a>
44  * <p/>
45  * RaftActor has 3 states and each state has a certain behavior associated
46  * with it. A Raft actor can behave as,
47  * <ul>
48  * <li> A Leader </li>
49  * <li> A Follower (or) </li>
50  * <li> A Candidate </li>
51  * </ul>
52  * <p/>
53  * <p/>
54  * A RaftActor MUST be a Leader in order to accept requests from clients to
55  * change the state of it's encapsulated state machine. Once a RaftActor becomes
56  * a Leader it is also responsible for ensuring that all followers ultimately
57  * have the same log and therefore the same state machine as itself.
58  * <p/>
59  * <p/>
60  * The current behavior of a RaftActor determines how election for leadership
61  * is initiated and how peer RaftActors react to request for votes.
62  * <p/>
63  * <p/>
64  * Each RaftActor also needs to know the current election term. It uses this
65  * information for a couple of things. One is to simply figure out who it
66  * voted for in the last election. Another is to figure out if the message
67  * it received to update it's state is stale.
68  * <p/>
69  * <p/>
70  * The RaftActor uses akka-persistence to store it's replicated log.
71  * Furthermore through it's behaviors a Raft Actor determines
72  * <p/>
73  * <ul>
74  * <li> when a log entry should be persisted </li>
75  * <li> when a log entry should be applied to the state machine (and) </li>
76  * <li> when a snapshot should be saved </li>
77  * </ul>
78  */
79 public abstract class RaftActor extends UntypedPersistentActor {
80     protected final LoggingAdapter LOG =
81         Logging.getLogger(getContext().system(), this);
82
83     /**
84      * The current state determines the current behavior of a RaftActor
85      * A Raft Actor always starts off in the Follower State
86      */
87     private RaftActorBehavior currentBehavior;
88
89     /**
90      * This context should NOT be passed directly to any other actor it is
91      * only to be consumed by the RaftActorBehaviors
92      */
93     private RaftActorContext context;
94
95     /**
96      * The in-memory journal
97      */
98     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
99
100
101     public RaftActor(String id, Map<String, String> peerAddresses) {
102         context = new RaftActorContextImpl(this.getSelf(),
103             this.getContext(),
104             id, new ElectionTermImpl(),
105             -1, -1, replicatedLog, peerAddresses, LOG);
106     }
107
108     @Override public void onReceiveRecover(Object message) {
109         if (message instanceof SnapshotOffer) {
110             SnapshotOffer offer = (SnapshotOffer) message;
111             Snapshot snapshot = (Snapshot) offer.snapshot();
112
113             // Create a replicated log with the snapshot information
114             // The replicated log can be used later on to retrieve this snapshot
115             // when we need to install it on a peer
116             replicatedLog = new ReplicatedLogImpl(snapshot);
117
118             // Apply the snapshot to the actors state
119             applySnapshot(snapshot.getState());
120
121         } else if (message instanceof ReplicatedLogEntry) {
122             replicatedLog.append((ReplicatedLogEntry) message);
123         } else if (message instanceof DeleteEntries) {
124             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
125         } else if (message instanceof UpdateElectionTerm) {
126             context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
127         } else if (message instanceof RecoveryCompleted) {
128             LOG.debug(
129                 "Last index in log : " + replicatedLog.lastIndex());
130             currentBehavior = switchBehavior(RaftState.Follower);
131         }
132     }
133
134     @Override public void onReceiveCommand(Object message) {
135         if (message instanceof ApplyState){
136             ApplyState applyState = (ApplyState) message;
137
138             LOG.debug("Applying state for log index {} data {}",
139                 applyState.getReplicatedLogEntry().getIndex(),
140                 applyState.getReplicatedLogEntry().getData());
141
142             applyState(applyState.getClientActor(), applyState.getIdentifier(),
143                 applyState.getReplicatedLogEntry().getData());
144         } else if(message instanceof ApplySnapshot ) {
145             applySnapshot(((ApplySnapshot) message).getSnapshot());
146         } else if (message instanceof FindLeader) {
147             getSender().tell(
148                 new FindLeaderReply(
149                     context.getPeerAddress(currentBehavior.getLeaderId())),
150                 getSelf()
151             );
152         } else if (message instanceof SaveSnapshotSuccess) {
153             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
154
155             // TODO: Not sure if we want to be this aggressive with trimming stuff
156             trimPersistentData(success.metadata().sequenceNr());
157
158         } else if (message instanceof SaveSnapshotFailure) {
159
160             // TODO: Handle failure in saving the snapshot
161             // Maybe do retries on failure
162
163         } else if (message instanceof AddRaftPeer){
164
165             // FIXME : Do not add raft peers like this.
166             // When adding a new Peer we have to ensure that the a majority of
167             // the peers know about the new Peer. Doing it this way may cause
168             // a situation where multiple Leaders may emerge
169             AddRaftPeer arp = (AddRaftPeer)message;
170            context.addToPeers(arp.getName(), arp.getAddress());
171
172         } else if (message instanceof RemoveRaftPeer){
173
174             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
175             context.removePeer(rrp.getName());
176
177         } else {
178
179             RaftState state =
180                 currentBehavior.handleMessage(getSender(), message);
181             currentBehavior = switchBehavior(state);
182         }
183     }
184
185
186
187     /**
188      * When a derived RaftActor needs to persist something it must call
189      * persistData.
190      *
191      * @param clientActor
192      * @param identifier
193      * @param data
194      */
195     protected void persistData(ActorRef clientActor, String identifier,
196         Object data) {
197
198         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
199             context.getReplicatedLog().lastIndex() + 1,
200             context.getTermInformation().getCurrentTerm(), data);
201
202         LOG.debug("Persist data {}", replicatedLogEntry);
203
204         replicatedLog
205             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
206     }
207
208     protected String getId() {
209         return context.getId();
210     }
211
212     /**
213      * Derived actors can call the isLeader method to check if the current
214      * RaftActor is the Leader or not
215      *
216      * @return true it this RaftActor is a Leader false otherwise
217      */
218     protected boolean isLeader() {
219         return context.getId().equals(currentBehavior.getLeaderId());
220     }
221
222     /**
223      * Derived actor can call getLeader if they need a reference to the Leader.
224      * This would be useful for example in forwarding a request to an actor
225      * which is the leader
226      *
227      * @return A reference to the leader if known, null otherwise
228      */
229     protected ActorSelection getLeader(){
230         String leaderId = currentBehavior.getLeaderId();
231         if (leaderId == null) {
232             return null;
233         }
234         String peerAddress = context.getPeerAddress(leaderId);
235         LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
236             + peerAddress);
237         return context.actorSelection(peerAddress);
238     }
239
240     protected RaftState getRaftState() {
241         return currentBehavior.state();
242     }
243
244
245
246     /**
247      * The applyState method will be called by the RaftActor when some data
248      * needs to be applied to the actor's state
249      *
250      * @param clientActor A reference to the client who sent this message. This
251      *                    is the same reference that was passed to persistData
252      *                    by the derived actor. clientActor may be null when
253      *                    the RaftActor is behaving as a follower or during
254      *                    recovery.
255      * @param identifier  The identifier of the persisted data. This is also
256      *                    the same identifier that was passed to persistData by
257      *                    the derived actor. identifier may be null when
258      *                    the RaftActor is behaving as a follower or during
259      *                    recovery
260      * @param data        A piece of data that was persisted by the persistData call.
261      *                    This should NEVER be null.
262      */
263     protected abstract void applyState(ActorRef clientActor, String identifier,
264         Object data);
265
266     /**
267      * This method will be called by the RaftActor when a snapshot needs to be
268      * created. The derived actor should respond with its current state.
269      * <p/>
270      * During recovery the state that is returned by the derived actor will
271      * be passed back to it by calling the applySnapshot  method
272      *
273      * @return The current state of the actor
274      */
275     protected abstract Object createSnapshot();
276
277     /**
278      * This method will be called by the RaftActor during recovery to
279      * reconstruct the state of the actor.
280      * <p/>
281      * This method may also be called at any other point during normal
282      * operations when the derived actor is out of sync with it's peers
283      * and the only way to bring it in sync is by applying a snapshot
284      *
285      * @param snapshot A snapshot of the state of the actor
286      */
287     protected abstract void applySnapshot(Object snapshot);
288
289     private RaftActorBehavior switchBehavior(RaftState state) {
290         if (currentBehavior != null) {
291             if (currentBehavior.state() == state) {
292                 return currentBehavior;
293             }
294             LOG.info("Switching from state " + currentBehavior.state() + " to "
295                 + state);
296
297             try {
298                 currentBehavior.close();
299             } catch (Exception e) {
300                 LOG.error(e,
301                     "Failed to close behavior : " + currentBehavior.state());
302             }
303
304         } else {
305             LOG.info("Switching behavior to " + state);
306         }
307         RaftActorBehavior behavior = null;
308         if (state == RaftState.Candidate) {
309             behavior = new Candidate(context);
310         } else if (state == RaftState.Follower) {
311             behavior = new Follower(context);
312         } else {
313             behavior = new Leader(context);
314         }
315         return behavior;
316     }
317
318     private void trimPersistentData(long sequenceNumber) {
319         // Trim snapshots
320         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
321         // For now guessing that it is ANDed.
322         deleteSnapshots(new SnapshotSelectionCriteria(
323             sequenceNumber - 100000, 43200000));
324
325         // Trim journal
326         deleteMessages(sequenceNumber);
327     }
328
329
330     private class ReplicatedLogImpl implements ReplicatedLog {
331         private final List<ReplicatedLogEntry> journal;
332         private final Object snapshot;
333         private long snapshotIndex = -1;
334         private long snapshotTerm = -1;
335
336         public ReplicatedLogImpl(Snapshot snapshot) {
337             this.snapshot = snapshot.getState();
338             this.snapshotIndex = snapshot.getLastAppliedIndex();
339             this.snapshotTerm = snapshot.getLastAppliedTerm();
340
341             this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
342         }
343
344         public ReplicatedLogImpl() {
345             this.snapshot = null;
346             this.journal = new ArrayList<>();
347         }
348
349         @Override public ReplicatedLogEntry get(long index) {
350             int adjustedIndex = adjustedIndex(index);
351
352             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
353                 return null;
354             }
355
356             return journal.get(adjustedIndex);
357         }
358
359         @Override public ReplicatedLogEntry last() {
360             if (journal.size() == 0) {
361                 return null;
362             }
363             return get(journal.size() - 1);
364         }
365
366         @Override public long lastIndex() {
367             if (journal.size() == 0) {
368                 return -1;
369             }
370
371             return last().getIndex();
372         }
373
374         @Override public long lastTerm() {
375             if (journal.size() == 0) {
376                 return -1;
377             }
378
379             return last().getTerm();
380         }
381
382
383         @Override public void removeFrom(long index) {
384             int adjustedIndex = adjustedIndex(index);
385
386             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
387                 return;
388             }
389
390             journal.subList(adjustedIndex , journal.size()).clear();
391         }
392
393
394         @Override public void removeFromAndPersist(long index) {
395             int adjustedIndex = adjustedIndex(index);
396
397             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
398                 return;
399             }
400
401             // FIXME: Maybe this should be done after the command is saved
402             journal.subList(adjustedIndex , journal.size()).clear();
403
404             persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
405
406                 @Override public void apply(DeleteEntries param)
407                     throws Exception {
408                     //FIXME : Doing nothing for now
409                 }
410             });
411
412
413         }
414
415         @Override public void append(
416             final ReplicatedLogEntry replicatedLogEntry) {
417             journal.add(replicatedLogEntry);
418         }
419
420         @Override public List<ReplicatedLogEntry> getFrom(long index) {
421             int adjustedIndex = adjustedIndex(index);
422
423             List<ReplicatedLogEntry> entries = new ArrayList<>(100);
424             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
425                 return entries;
426             }
427
428
429             for (int i = adjustedIndex;
430                  i < journal.size(); i++) {
431                 entries.add(journal.get(i));
432             }
433             return entries;
434         }
435
436         @Override public void appendAndPersist(
437             final ReplicatedLogEntry replicatedLogEntry) {
438             appendAndPersist(null, null, replicatedLogEntry);
439         }
440
441         public void appendAndPersist(final ActorRef clientActor,
442             final String identifier,
443             final ReplicatedLogEntry replicatedLogEntry) {
444             context.getLogger().debug(
445                 "Append log entry and persist " + replicatedLogEntry);
446             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
447             journal.add(replicatedLogEntry);
448
449             // When persisting events with persist it is guaranteed that the
450             // persistent actor will not receive further commands between the
451             // persist call and the execution(s) of the associated event
452             // handler. This also holds for multiple persist calls in context
453             // of a single command.
454             persist(replicatedLogEntry,
455                 new Procedure<ReplicatedLogEntry>() {
456                     public void apply(ReplicatedLogEntry evt) throws Exception {
457                         // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
458                         if (size() > 100000) {
459                             ReplicatedLogEntry lastAppliedEntry =
460                                 get(context.getLastApplied());
461                             long lastAppliedIndex = -1;
462                             long lastAppliedTerm = -1;
463                             if (lastAppliedEntry != null) {
464                                 lastAppliedIndex = lastAppliedEntry.getIndex();
465                                 lastAppliedTerm = lastAppliedEntry.getTerm();
466                             }
467
468                             saveSnapshot(Snapshot.create(createSnapshot(),
469                                 getFrom(context.getLastApplied() + 1),
470                                 lastIndex(), lastTerm(), lastAppliedIndex,
471                                 lastAppliedTerm));
472                         }
473                         // Send message for replication
474                         if (clientActor != null) {
475                             currentBehavior.handleMessage(getSelf(),
476                                 new Replicate(clientActor, identifier,
477                                     replicatedLogEntry)
478                             );
479                         }
480                     }
481                 }
482             );
483         }
484
485         @Override public long size() {
486             return journal.size() + snapshotIndex + 1;
487         }
488
489         @Override public boolean isPresent(long index) {
490             int adjustedIndex = adjustedIndex(index);
491
492             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
493                 return false;
494             }
495             return true;
496         }
497
498         @Override public boolean isInSnapshot(long index) {
499             return index <= snapshotIndex;
500         }
501
502         @Override public Object getSnapshot() {
503             return snapshot;
504         }
505
506         @Override public long getSnapshotIndex() {
507             return snapshotIndex;
508         }
509
510         @Override public long getSnapshotTerm() {
511             return snapshotTerm;
512         }
513
514         private int adjustedIndex(long index) {
515             if(snapshotIndex < 0){
516                 return (int) index;
517             }
518             return (int) (index - snapshotIndex);
519         }
520     }
521
522
523     private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
524         Serializable {
525
526         private final long index;
527         private final long term;
528         private final Object payload;
529
530         public ReplicatedLogImplEntry(long index, long term, Object payload) {
531
532             this.index = index;
533             this.term = term;
534             this.payload = payload;
535         }
536
537         @Override public Object getData() {
538             return payload;
539         }
540
541         @Override public long getTerm() {
542             return term;
543         }
544
545         @Override public long getIndex() {
546             return index;
547         }
548
549         @Override public String toString() {
550             return "Entry{" +
551                 "index=" + index +
552                 ", term=" + term +
553                 '}';
554         }
555     }
556
557     private static class DeleteEntries implements Serializable {
558         private final int fromIndex;
559
560
561         public DeleteEntries(int fromIndex) {
562             this.fromIndex = fromIndex;
563         }
564
565         public int getFromIndex() {
566             return fromIndex;
567         }
568     }
569
570
571     private static class Snapshot implements Serializable {
572         private final Object state;
573         private final List<ReplicatedLogEntry> unAppliedEntries;
574         private final long lastIndex;
575         private final long lastTerm;
576         private final long lastAppliedIndex;
577         private final long lastAppliedTerm;
578
579         private Snapshot(Object state,
580             List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
581             long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
582             this.state = state;
583             this.unAppliedEntries = unAppliedEntries;
584             this.lastIndex = lastIndex;
585             this.lastTerm = lastTerm;
586             this.lastAppliedIndex = lastAppliedIndex;
587             this.lastAppliedTerm = lastAppliedTerm;
588         }
589
590
591         public static Snapshot create(Object state,
592             List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
593             long lastAppliedIndex, long lastAppliedTerm) {
594             return new Snapshot(state, entries, lastIndex, lastTerm,
595                 lastAppliedIndex, lastAppliedTerm);
596         }
597
598         public Object getState() {
599             return state;
600         }
601
602         public List<ReplicatedLogEntry> getUnAppliedEntries() {
603             return unAppliedEntries;
604         }
605
606         public long getLastTerm() {
607             return lastTerm;
608         }
609
610         public long getLastAppliedIndex() {
611             return lastAppliedIndex;
612         }
613
614         public long getLastAppliedTerm() {
615             return lastAppliedTerm;
616         }
617     }
618
619     private class ElectionTermImpl implements ElectionTerm {
620         /**
621          * Identifier of the actor whose election term information this is
622          */
623         private long currentTerm = 0;
624         private String votedFor = null;
625
626         public long getCurrentTerm() {
627             return currentTerm;
628         }
629
630         public String getVotedFor() {
631             return votedFor;
632         }
633
634         @Override public void update(long currentTerm, String votedFor) {
635             LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
636
637             this.currentTerm = currentTerm;
638             this.votedFor = votedFor;
639         }
640
641         @Override
642         public void updateAndPersist(long currentTerm, String votedFor){
643             update(currentTerm, votedFor);
644             // FIXME : Maybe first persist then update the state
645             persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
646
647                 @Override public void apply(UpdateElectionTerm param)
648                     throws Exception {
649
650                 }
651             });
652         }
653     }
654
655     private static class UpdateElectionTerm implements Serializable {
656         private final long currentTerm;
657         private final String votedFor;
658
659         public UpdateElectionTerm(long currentTerm, String votedFor) {
660             this.currentTerm = currentTerm;
661             this.votedFor = votedFor;
662         }
663
664         public long getCurrentTerm() {
665             return currentTerm;
666         }
667
668         public String getVotedFor() {
669             return votedFor;
670         }
671     }
672
673 }