Test driver and changes related to it
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.SaveSnapshotFailure;
18 import akka.persistence.SaveSnapshotSuccess;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.persistence.UntypedPersistentActor;
22 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
23 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
24 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
25 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
26 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
27 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
28 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
29 import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
31 import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
33
34 import java.io.Serializable;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38
39 /**
40  * RaftActor encapsulates a state machine that needs to be kept synchronized
41  * in a cluster. It implements the RAFT algorithm as described in the paper
42  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
43  * In Search of an Understandable Consensus Algorithm</a>
44  * <p/>
45  * RaftActor has 3 states and each state has a certain behavior associated
46  * with it. A Raft actor can behave as,
47  * <ul>
48  * <li> A Leader </li>
49  * <li> A Follower (or) </li>
50  * <li> A Candidate </li>
51  * </ul>
52  * <p/>
53  * <p/>
54  * A RaftActor MUST be a Leader in order to accept requests from clients to
55  * change the state of it's encapsulated state machine. Once a RaftActor becomes
56  * a Leader it is also responsible for ensuring that all followers ultimately
57  * have the same log and therefore the same state machine as itself.
58  * <p/>
59  * <p/>
60  * The current behavior of a RaftActor determines how election for leadership
61  * is initiated and how peer RaftActors react to request for votes.
62  * <p/>
63  * <p/>
64  * Each RaftActor also needs to know the current election term. It uses this
65  * information for a couple of things. One is to simply figure out who it
66  * voted for in the last election. Another is to figure out if the message
67  * it received to update it's state is stale.
68  * <p/>
69  * <p/>
70  * The RaftActor uses akka-persistence to store it's replicated log.
71  * Furthermore through it's behaviors a Raft Actor determines
72  * <p/>
73  * <ul>
74  * <li> when a log entry should be persisted </li>
75  * <li> when a log entry should be applied to the state machine (and) </li>
76  * <li> when a snapshot should be saved </li>
77  * </ul>
78  */
79 public abstract class RaftActor extends UntypedPersistentActor {
80     protected final LoggingAdapter LOG =
81         Logging.getLogger(getContext().system(), this);
82
83     /**
84      * The current state determines the current behavior of a RaftActor
85      * A Raft Actor always starts off in the Follower State
86      */
87     private RaftActorBehavior currentBehavior;
88
89     /**
90      * This context should NOT be passed directly to any other actor it is
91      * only to be consumed by the RaftActorBehaviors
92      */
93     private RaftActorContext context;
94
95     /**
96      * The in-memory journal
97      */
98     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
99
100
101     public RaftActor(String id, Map<String, String> peerAddresses) {
102         context = new RaftActorContextImpl(this.getSelf(),
103             this.getContext(),
104             id, new ElectionTermImpl(getSelf().path().toString()),
105             -1, -1, replicatedLog, peerAddresses, LOG);
106     }
107
108     @Override public void onReceiveRecover(Object message) {
109         if (message instanceof SnapshotOffer) {
110             SnapshotOffer offer = (SnapshotOffer) message;
111             Snapshot snapshot = (Snapshot) offer.snapshot();
112
113             // Create a replicated log with the snapshot information
114             // The replicated log can be used later on to retrieve this snapshot
115             // when we need to install it on a peer
116             replicatedLog = new ReplicatedLogImpl(snapshot);
117
118             // Apply the snapshot to the actors state
119             applySnapshot(snapshot.getState());
120
121         } else if (message instanceof ReplicatedLogEntry) {
122             replicatedLog.append((ReplicatedLogEntry) message);
123         } else if (message instanceof RecoveryCompleted) {
124             LOG.debug(
125                 "Last index in log : " + replicatedLog.lastIndex());
126             currentBehavior = switchBehavior(RaftState.Follower);
127         }
128     }
129
130     @Override public void onReceiveCommand(Object message) {
131         if (message instanceof ApplyState){
132             ApplyState applyState = (ApplyState) message;
133
134             LOG.debug("Applying state for log index {}",
135                 applyState.getReplicatedLogEntry().getIndex());
136
137             applyState(applyState.getClientActor(), applyState.getIdentifier(),
138                 applyState.getReplicatedLogEntry().getData());
139         } else if(message instanceof ApplySnapshot ) {
140             applySnapshot(((ApplySnapshot) message).getSnapshot());
141         } else if (message instanceof FindLeader) {
142             getSender().tell(
143                 new FindLeaderReply(
144                     context.getPeerAddress(currentBehavior.getLeaderId())),
145                 getSelf()
146             );
147         } else if (message instanceof SaveSnapshotSuccess) {
148             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
149
150             // TODO: Not sure if we want to be this aggressive with trimming stuff
151             trimPersistentData(success.metadata().sequenceNr());
152
153         } else if (message instanceof SaveSnapshotFailure) {
154             // TODO: Handle failure in saving the snapshot
155         } else if (message instanceof FindLeader){
156             getSender().tell(new FindLeaderReply(
157                 context.getPeerAddress(currentBehavior.getLeaderId())),
158                 getSelf());
159
160         } else if (message instanceof AddRaftPeer){
161             AddRaftPeer arp = (AddRaftPeer)message;
162            context.addToPeers(arp.getName(), arp.getAddress());
163
164         } else if (message instanceof RemoveRaftPeer){
165             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
166             context.removePeer(rrp.getName());
167         } else {
168             RaftState state =
169                 currentBehavior.handleMessage(getSender(), message);
170             currentBehavior = switchBehavior(state);
171         }
172     }
173
174
175
176     /**
177      * When a derived RaftActor needs to persist something it must call
178      * persistData.
179      *
180      * @param clientActor
181      * @param identifier
182      * @param data
183      */
184     protected void persistData(ActorRef clientActor, String identifier,
185         Object data) {
186         LOG.debug("Persist data " + identifier);
187         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
188             context.getReplicatedLog().lastIndex() + 1,
189             context.getTermInformation().getCurrentTerm(), data);
190
191         replicatedLog
192             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
193     }
194
195     protected String getId() {
196         return context.getId();
197     }
198
199     /**
200      * Derived actors can call the isLeader method to check if the current
201      * RaftActor is the Leader or not
202      *
203      * @return true it this RaftActor is a Leader false otherwise
204      */
205     protected boolean isLeader() {
206         return context.getId().equals(currentBehavior.getLeaderId());
207     }
208
209     /**
210      * Derived actor can call getLeader if they need a reference to the Leader.
211      * This would be useful for example in forwarding a request to an actor
212      * which is the leader
213      *
214      * @return A reference to the leader if known, null otherwise
215      */
216     protected ActorSelection getLeader(){
217         String leaderId = currentBehavior.getLeaderId();
218         if (leaderId == null) {
219             return null;
220         }
221         String peerAddress = context.getPeerAddress(leaderId);
222         LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
223             + peerAddress);
224         return context.actorSelection(peerAddress);
225     }
226
227     protected RaftState getRaftState() {
228         return currentBehavior.state();
229     }
230
231
232
233     /**
234      * The applyState method will be called by the RaftActor when some data
235      * needs to be applied to the actor's state
236      *
237      * @param clientActor A reference to the client who sent this message. This
238      *                    is the same reference that was passed to persistData
239      *                    by the derived actor. clientActor may be null when
240      *                    the RaftActor is behaving as a follower or during
241      *                    recovery.
242      * @param identifier  The identifier of the persisted data. This is also
243      *                    the same identifier that was passed to persistData by
244      *                    the derived actor. identifier may be null when
245      *                    the RaftActor is behaving as a follower or during
246      *                    recovery
247      * @param data        A piece of data that was persisted by the persistData call.
248      *                    This should NEVER be null.
249      */
250     protected abstract void applyState(ActorRef clientActor, String identifier,
251         Object data);
252
253     /**
254      * This method will be called by the RaftActor when a snapshot needs to be
255      * created. The derived actor should respond with its current state.
256      * <p/>
257      * During recovery the state that is returned by the derived actor will
258      * be passed back to it by calling the applySnapshot  method
259      *
260      * @return The current state of the actor
261      */
262     protected abstract Object createSnapshot();
263
264     /**
265      * This method will be called by the RaftActor during recovery to
266      * reconstruct the state of the actor.
267      * <p/>
268      * This method may also be called at any other point during normal
269      * operations when the derived actor is out of sync with it's peers
270      * and the only way to bring it in sync is by applying a snapshot
271      *
272      * @param snapshot A snapshot of the state of the actor
273      */
274     protected abstract void applySnapshot(Object snapshot);
275
276     private RaftActorBehavior switchBehavior(RaftState state) {
277         if (currentBehavior != null) {
278             if (currentBehavior.state() == state) {
279                 return currentBehavior;
280             }
281             LOG.info("Switching from state " + currentBehavior.state() + " to "
282                 + state);
283
284             try {
285                 currentBehavior.close();
286             } catch (Exception e) {
287                 LOG.error(e,
288                     "Failed to close behavior : " + currentBehavior.state());
289             }
290
291         } else {
292             LOG.info("Switching behavior to " + state);
293         }
294         RaftActorBehavior behavior = null;
295         if (state == RaftState.Candidate) {
296             behavior = new Candidate(context);
297         } else if (state == RaftState.Follower) {
298             behavior = new Follower(context);
299         } else {
300             behavior = new Leader(context);
301         }
302         return behavior;
303     }
304
305     private void trimPersistentData(long sequenceNumber) {
306         // Trim snapshots
307         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
308         // For now guessing that it is ANDed.
309         deleteSnapshots(new SnapshotSelectionCriteria(
310             sequenceNumber - 100000, 43200000));
311
312         // Trim journal
313         deleteMessages(sequenceNumber);
314     }
315
316     private class ReplicatedLogImpl implements ReplicatedLog {
317         private final List<ReplicatedLogEntry> journal;
318         private final Object snapshot;
319         private long snapshotIndex = -1;
320         private long snapshotTerm = -1;
321
322         public ReplicatedLogImpl(Snapshot snapshot) {
323             this.snapshot = snapshot.getState();
324             this.snapshotIndex = snapshot.getLastAppliedIndex();
325             this.snapshotTerm = snapshot.getLastAppliedTerm();
326
327             this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
328         }
329
330         public ReplicatedLogImpl() {
331             this.snapshot = null;
332             this.journal = new ArrayList<>();
333         }
334
335         @Override public ReplicatedLogEntry get(long index) {
336             int adjustedIndex = adjustedIndex(index);
337
338             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
339                 return null;
340             }
341
342             return journal.get(adjustedIndex);
343         }
344
345         @Override public ReplicatedLogEntry last() {
346             if (journal.size() == 0) {
347                 return null;
348             }
349             return get(journal.size() - 1);
350         }
351
352         @Override public long lastIndex() {
353             if (journal.size() == 0) {
354                 return -1;
355             }
356
357             return last().getIndex();
358         }
359
360         @Override public long lastTerm() {
361             if (journal.size() == 0) {
362                 return -1;
363             }
364
365             return last().getTerm();
366         }
367
368
369         @Override public void removeFrom(long index) {
370             int adjustedIndex = adjustedIndex(index);
371
372             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
373                 return;
374             }
375             for (int i = adjustedIndex;
376                  i < journal.size(); i++) {
377                 deleteMessage(i);
378                 journal.remove(i);
379             }
380         }
381
382         @Override public void append(
383             final ReplicatedLogEntry replicatedLogEntry) {
384             journal.add(replicatedLogEntry);
385         }
386
387         @Override public List<ReplicatedLogEntry> getFrom(long index) {
388             int adjustedIndex = adjustedIndex(index);
389
390             List<ReplicatedLogEntry> entries = new ArrayList<>(100);
391             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
392                 return entries;
393             }
394             for (int i = adjustedIndex;
395                  i < journal.size(); i++) {
396                 entries.add(journal.get(i));
397             }
398             return entries;
399         }
400
401         @Override public void appendAndPersist(
402             final ReplicatedLogEntry replicatedLogEntry) {
403             appendAndPersist(null, null, replicatedLogEntry);
404         }
405
406         public void appendAndPersist(final ActorRef clientActor,
407             final String identifier,
408             final ReplicatedLogEntry replicatedLogEntry) {
409             context.getLogger().debug(
410                 "Append log entry and persist " + replicatedLogEntry.getIndex());
411             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
412             journal.add(replicatedLogEntry);
413
414             // When persisting events with persist it is guaranteed that the
415             // persistent actor will not receive further commands between the
416             // persist call and the execution(s) of the associated event
417             // handler. This also holds for multiple persist calls in context
418             // of a single command.
419             persist(replicatedLogEntry,
420                 new Procedure<ReplicatedLogEntry>() {
421                     public void apply(ReplicatedLogEntry evt) throws Exception {
422                         // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
423                         if (size() > 100000) {
424                             ReplicatedLogEntry lastAppliedEntry =
425                                 get(context.getLastApplied());
426                             long lastAppliedIndex = -1;
427                             long lastAppliedTerm = -1;
428                             if (lastAppliedEntry != null) {
429                                 lastAppliedIndex = lastAppliedEntry.getIndex();
430                                 lastAppliedTerm = lastAppliedEntry.getTerm();
431                             }
432
433                             saveSnapshot(Snapshot.create(createSnapshot(),
434                                 getFrom(context.getLastApplied() + 1),
435                                 lastIndex(), lastTerm(), lastAppliedIndex,
436                                 lastAppliedTerm));
437                         }
438                         // Send message for replication
439                         if (clientActor != null) {
440                             currentBehavior.handleMessage(getSelf(),
441                                 new Replicate(clientActor, identifier,
442                                     replicatedLogEntry)
443                             );
444                         }
445                     }
446                 }
447             );
448         }
449
450         @Override public long size() {
451             return journal.size() + snapshotIndex;
452         }
453
454         @Override public boolean isPresent(long index) {
455             int adjustedIndex = adjustedIndex(index);
456
457             if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
458                 return false;
459             }
460             return true;
461         }
462
463         @Override public boolean isInSnapshot(long index) {
464             return index <= snapshotIndex;
465         }
466
467         @Override public Object getSnapshot() {
468             return snapshot;
469         }
470
471         @Override public long getSnapshotIndex() {
472             return snapshotIndex;
473         }
474
475         @Override public long getSnapshotTerm() {
476             return snapshotTerm;
477         }
478
479         private int adjustedIndex(long index) {
480             if(snapshotIndex < 0){
481                 return (int) index;
482             }
483             return (int) (index - snapshotIndex);
484         }
485     }
486
487
488     private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
489         Serializable {
490
491         private final long index;
492         private final long term;
493         private final Object payload;
494
495         public ReplicatedLogImplEntry(long index, long term, Object payload) {
496
497             this.index = index;
498             this.term = term;
499             this.payload = payload;
500         }
501
502         @Override public Object getData() {
503             return payload;
504         }
505
506         @Override public long getTerm() {
507             return term;
508         }
509
510         @Override public long getIndex() {
511             return index;
512         }
513     }
514
515
516     private static class Snapshot implements Serializable {
517         private final Object state;
518         private final List<ReplicatedLogEntry> unAppliedEntries;
519         private final long lastIndex;
520         private final long lastTerm;
521         private final long lastAppliedIndex;
522         private final long lastAppliedTerm;
523
524         private Snapshot(Object state,
525             List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
526             long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
527             this.state = state;
528             this.unAppliedEntries = unAppliedEntries;
529             this.lastIndex = lastIndex;
530             this.lastTerm = lastTerm;
531             this.lastAppliedIndex = lastAppliedIndex;
532             this.lastAppliedTerm = lastAppliedTerm;
533         }
534
535
536         public static Snapshot create(Object state,
537             List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
538             long lastAppliedIndex, long lastAppliedTerm) {
539             return new Snapshot(state, entries, lastIndex, lastTerm,
540                 lastAppliedIndex, lastAppliedTerm);
541         }
542
543         public Object getState() {
544             return state;
545         }
546
547         public List<ReplicatedLogEntry> getUnAppliedEntries() {
548             return unAppliedEntries;
549         }
550
551         public long getLastTerm() {
552             return lastTerm;
553         }
554
555         public long getLastAppliedIndex() {
556             return lastAppliedIndex;
557         }
558
559         public long getLastAppliedTerm() {
560             return lastAppliedTerm;
561         }
562     }
563
564
565 }