Merge "(Bug 2035) - Increasing default Akka config for auto-down of unreachable nodes...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.japi.Procedure;
14 import akka.persistence.RecoveryCompleted;
15 import akka.persistence.SaveSnapshotFailure;
16 import akka.persistence.SaveSnapshotSuccess;
17 import akka.persistence.SnapshotOffer;
18 import akka.persistence.SnapshotSelectionCriteria;
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Stopwatch;
22 import com.google.protobuf.ByteString;
23 import java.io.Serializable;
24 import java.util.Map;
25 import org.opendaylight.controller.cluster.DataPersistenceProvider;
26 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
27 import org.opendaylight.controller.cluster.notifications.RoleChanged;
28 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
31 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
33 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
34 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
37 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
38 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
39 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
40 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
42 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
43 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * RaftActor encapsulates a state machine that needs to be kept synchronized
49  * in a cluster. It implements the RAFT algorithm as described in the paper
50  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
51  * In Search of an Understandable Consensus Algorithm</a>
52  * <p/>
53  * RaftActor has 3 states and each state has a certain behavior associated
54  * with it. A Raft actor can behave as,
55  * <ul>
56  * <li> A Leader </li>
57  * <li> A Follower (or) </li>
58  * <li> A Candidate </li>
59  * </ul>
60  * <p/>
61  * <p/>
62  * A RaftActor MUST be a Leader in order to accept requests from clients to
63  * change the state of it's encapsulated state machine. Once a RaftActor becomes
64  * a Leader it is also responsible for ensuring that all followers ultimately
65  * have the same log and therefore the same state machine as itself.
66  * <p/>
67  * <p/>
68  * The current behavior of a RaftActor determines how election for leadership
69  * is initiated and how peer RaftActors react to request for votes.
70  * <p/>
71  * <p/>
72  * Each RaftActor also needs to know the current election term. It uses this
73  * information for a couple of things. One is to simply figure out who it
74  * voted for in the last election. Another is to figure out if the message
75  * it received to update it's state is stale.
76  * <p/>
77  * <p/>
78  * The RaftActor uses akka-persistence to store it's replicated log.
79  * Furthermore through it's behaviors a Raft Actor determines
80  * <p/>
81  * <ul>
82  * <li> when a log entry should be persisted </li>
83  * <li> when a log entry should be applied to the state machine (and) </li>
84  * <li> when a snapshot should be saved </li>
85  * </ul>
86  */
87 public abstract class RaftActor extends AbstractUntypedPersistentActor {
88     protected final Logger LOG = LoggerFactory.getLogger(getClass());
89
90     /**
91      * The current state determines the current behavior of a RaftActor
92      * A Raft Actor always starts off in the Follower State
93      */
94     private RaftActorBehavior currentBehavior;
95
96     /**
97      * This context should NOT be passed directly to any other actor it is
98      * only to be consumed by the RaftActorBehaviors
99      */
100     private final RaftActorContext context;
101
102     /**
103      * The in-memory journal
104      */
105     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
106
107     private CaptureSnapshot captureSnapshot = null;
108
109     private Stopwatch recoveryTimer;
110
111     private int currentRecoveryBatchCount;
112
113     public RaftActor(String id, Map<String, String> peerAddresses) {
114         this(id, peerAddresses, Optional.<ConfigParams>absent());
115     }
116
117     public RaftActor(String id, Map<String, String> peerAddresses,
118          Optional<ConfigParams> configParams) {
119
120         context = new RaftActorContextImpl(this.getSelf(),
121             this.getContext(), id, new ElectionTermImpl(),
122             -1, -1, replicatedLog, peerAddresses,
123             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
124             LOG);
125     }
126
127     private void initRecoveryTimer() {
128         if(recoveryTimer == null) {
129             recoveryTimer = new Stopwatch();
130             recoveryTimer.start();
131         }
132     }
133
134     @Override
135     public void preStart() throws Exception {
136         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
137                 context.getConfigParams().getJournalRecoveryLogBatchSize());
138
139         super.preStart();
140     }
141
142     @Override
143     public void handleRecover(Object message) {
144         if(persistence().isRecoveryApplicable()) {
145             if (message instanceof SnapshotOffer) {
146                 onRecoveredSnapshot((SnapshotOffer) message);
147             } else if (message instanceof ReplicatedLogEntry) {
148                 onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
149             } else if (message instanceof ApplyLogEntries) {
150                 onRecoveredApplyLogEntries((ApplyLogEntries) message);
151             } else if (message instanceof DeleteEntries) {
152                 replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
153             } else if (message instanceof UpdateElectionTerm) {
154                 context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
155                         ((UpdateElectionTerm) message).getVotedFor());
156             } else if (message instanceof RecoveryCompleted) {
157                 onRecoveryCompletedMessage();
158             }
159         } else {
160             if (message instanceof RecoveryCompleted) {
161                 // Delete all the messages from the akka journal so that we do not end up with consistency issues
162                 // Note I am not using the dataPersistenceProvider and directly using the akka api here
163                 deleteMessages(lastSequenceNr());
164
165                 // Delete all the akka snapshots as they will not be needed
166                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
167
168                 onRecoveryComplete();
169
170                 initializeBehavior();
171             }
172         }
173     }
174
175     private void onRecoveredSnapshot(SnapshotOffer offer) {
176         if(LOG.isDebugEnabled()) {
177             LOG.debug("{}: SnapshotOffer called..", persistenceId());
178         }
179
180         initRecoveryTimer();
181
182         Snapshot snapshot = (Snapshot) offer.snapshot();
183
184         // Create a replicated log with the snapshot information
185         // The replicated log can be used later on to retrieve this snapshot
186         // when we need to install it on a peer
187         replicatedLog = new ReplicatedLogImpl(snapshot);
188
189         context.setReplicatedLog(replicatedLog);
190         context.setLastApplied(snapshot.getLastAppliedIndex());
191         context.setCommitIndex(snapshot.getLastAppliedIndex());
192
193         Stopwatch timer = new Stopwatch();
194         timer.start();
195
196         // Apply the snapshot to the actors state
197         applyRecoverySnapshot(snapshot.getState());
198
199         timer.stop();
200         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
201                 replicatedLog.size(), persistenceId(), timer.toString(),
202                 replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
203     }
204
205     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
206         if(LOG.isDebugEnabled()) {
207             LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
208         }
209
210         replicatedLog.append(logEntry);
211     }
212
213     private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
214         if(LOG.isDebugEnabled()) {
215             LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
216                     persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
217         }
218
219         for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
220             batchRecoveredLogEntry(replicatedLog.get(i));
221         }
222
223         context.setLastApplied(ale.getToIndex());
224         context.setCommitIndex(ale.getToIndex());
225     }
226
227     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
228         initRecoveryTimer();
229
230         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
231         if(currentRecoveryBatchCount == 0) {
232             startLogRecoveryBatch(batchSize);
233         }
234
235         appendRecoveredLogEntry(logEntry.getData());
236
237         if(++currentRecoveryBatchCount >= batchSize) {
238             endCurrentLogRecoveryBatch();
239         }
240     }
241
242     private void endCurrentLogRecoveryBatch() {
243         applyCurrentLogRecoveryBatch();
244         currentRecoveryBatchCount = 0;
245     }
246
247     private void onRecoveryCompletedMessage() {
248         if(currentRecoveryBatchCount > 0) {
249             endCurrentLogRecoveryBatch();
250         }
251
252         onRecoveryComplete();
253
254         String recoveryTime = "";
255         if(recoveryTimer != null) {
256             recoveryTimer.stop();
257             recoveryTime = " in " + recoveryTimer.toString();
258             recoveryTimer = null;
259         }
260
261         LOG.info(
262             "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
263                 "Persistence Id =  " + persistenceId() +
264                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
265                 "journal-size={}",
266             replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
267             replicatedLog.getSnapshotTerm(), replicatedLog.size());
268
269         initializeBehavior();
270     }
271
272     protected void initializeBehavior(){
273         changeCurrentBehavior(new Follower(context));
274     }
275
276     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
277         RaftActorBehavior oldBehavior = currentBehavior;
278         currentBehavior = newBehavior;
279         handleBehaviorChange(oldBehavior, currentBehavior);
280     }
281
282     @Override public void handleCommand(Object message) {
283         if (message instanceof ApplyState){
284             ApplyState applyState = (ApplyState) message;
285
286             if(LOG.isDebugEnabled()) {
287                 LOG.debug("{}: Applying state for log index {} data {}",
288                     persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
289                     applyState.getReplicatedLogEntry().getData());
290             }
291
292             applyState(applyState.getClientActor(), applyState.getIdentifier(),
293                 applyState.getReplicatedLogEntry().getData());
294
295         } else if (message instanceof ApplyLogEntries){
296             ApplyLogEntries ale = (ApplyLogEntries) message;
297             if(LOG.isDebugEnabled()) {
298                 LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
299             }
300             persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
301                 @Override
302                 public void apply(ApplyLogEntries param) throws Exception {
303                 }
304             });
305
306         } else if(message instanceof ApplySnapshot ) {
307             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
308
309             if(LOG.isDebugEnabled()) {
310                 LOG.debug("{}: ApplySnapshot called on Follower Actor " +
311                         "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
312                     snapshot.getLastAppliedTerm()
313                 );
314             }
315
316             applySnapshot(snapshot.getState());
317
318             //clears the followers log, sets the snapshot index to ensure adjusted-index works
319             replicatedLog = new ReplicatedLogImpl(snapshot);
320             context.setReplicatedLog(replicatedLog);
321             context.setLastApplied(snapshot.getLastAppliedIndex());
322
323         } else if (message instanceof FindLeader) {
324             getSender().tell(
325                 new FindLeaderReply(getLeaderAddress()),
326                 getSelf()
327             );
328
329         } else if (message instanceof SaveSnapshotSuccess) {
330             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
331             LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
332
333             long sequenceNumber = success.metadata().sequenceNr();
334
335             commitSnapshot(sequenceNumber);
336
337         } else if (message instanceof SaveSnapshotFailure) {
338             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
339
340             LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
341                     persistenceId(), saveSnapshotFailure.cause());
342
343             context.getReplicatedLog().snapshotRollback();
344
345             LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
346                 "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
347                 context.getReplicatedLog().getSnapshotIndex(),
348                 context.getReplicatedLog().getSnapshotTerm(),
349                 context.getReplicatedLog().size());
350
351         } else if (message instanceof CaptureSnapshot) {
352             LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
353
354             if(captureSnapshot == null) {
355                 captureSnapshot = (CaptureSnapshot)message;
356                 createSnapshot();
357             }
358
359         } else if (message instanceof CaptureSnapshotReply){
360             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
361
362         } else {
363             if (!(message instanceof AppendEntriesMessages.AppendEntries)
364                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
365                 if(LOG.isDebugEnabled()) {
366                     LOG.debug("{}: onReceiveCommand: message: {}", persistenceId(), message.getClass());
367                 }
368             }
369
370             RaftActorBehavior oldBehavior = currentBehavior;
371             currentBehavior = currentBehavior.handleMessage(getSender(), message);
372
373             handleBehaviorChange(oldBehavior, currentBehavior);
374         }
375     }
376
377     private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
378         if (oldBehavior != currentBehavior){
379             onStateChanged();
380         }
381
382         String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
383         String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
384
385         // it can happen that the state has not changed but the leader has changed.
386         onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
387
388         if (getRoleChangeNotifier().isPresent() &&
389                 (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
390             getRoleChangeNotifier().get().tell(
391                     new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
392                     getSelf());
393         }
394     }
395
396     /**
397      * When a derived RaftActor needs to persist something it must call
398      * persistData.
399      *
400      * @param clientActor
401      * @param identifier
402      * @param data
403      */
404     protected void persistData(final ActorRef clientActor, final String identifier,
405         final Payload data) {
406
407         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
408             context.getReplicatedLog().lastIndex() + 1,
409             context.getTermInformation().getCurrentTerm(), data);
410
411         if(LOG.isDebugEnabled()) {
412             LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
413         }
414
415         final RaftActorContext raftContext = getRaftActorContext();
416
417         replicatedLog
418                 .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
419                     @Override
420                     public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
421                         if(!hasFollowers()){
422                             // Increment the Commit Index and the Last Applied values
423                             raftContext.setCommitIndex(replicatedLogEntry.getIndex());
424                             raftContext.setLastApplied(replicatedLogEntry.getIndex());
425
426                             // Apply the state immediately
427                             applyState(clientActor, identifier, data);
428
429                             // Send a ApplyLogEntries message so that we write the fact that we applied
430                             // the state to durable storage
431                             self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
432
433                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
434                             if(!context.isSnapshotCaptureInitiated()){
435                                 raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
436                                         raftContext.getTermInformation().getCurrentTerm());
437                                 raftContext.getReplicatedLog().snapshotCommit();
438                             } else {
439                                 LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
440                                         persistenceId(), getId());
441                             }
442                         } else if (clientActor != null) {
443                             // Send message for replication
444                             currentBehavior.handleMessage(getSelf(),
445                                     new Replicate(clientActor, identifier,
446                                             replicatedLogEntry)
447                             );
448                         }
449
450                     }
451                 });    }
452
453     protected String getId() {
454         return context.getId();
455     }
456
457     /**
458      * Derived actors can call the isLeader method to check if the current
459      * RaftActor is the Leader or not
460      *
461      * @return true it this RaftActor is a Leader false otherwise
462      */
463     protected boolean isLeader() {
464         return context.getId().equals(currentBehavior.getLeaderId());
465     }
466
467     /**
468      * Derived actor can call getLeader if they need a reference to the Leader.
469      * This would be useful for example in forwarding a request to an actor
470      * which is the leader
471      *
472      * @return A reference to the leader if known, null otherwise
473      */
474     protected ActorSelection getLeader(){
475         String leaderAddress = getLeaderAddress();
476
477         if(leaderAddress == null){
478             return null;
479         }
480
481         return context.actorSelection(leaderAddress);
482     }
483
484     /**
485      *
486      * @return the current leader's id
487      */
488     protected String getLeaderId(){
489         return currentBehavior.getLeaderId();
490     }
491
492     protected RaftState getRaftState() {
493         return currentBehavior.state();
494     }
495
496     protected ReplicatedLogEntry getLastLogEntry() {
497         return replicatedLog.last();
498     }
499
500     protected Long getCurrentTerm(){
501         return context.getTermInformation().getCurrentTerm();
502     }
503
504     protected Long getCommitIndex(){
505         return context.getCommitIndex();
506     }
507
508     protected Long getLastApplied(){
509         return context.getLastApplied();
510     }
511
512     protected RaftActorContext getRaftActorContext() {
513         return context;
514     }
515
516     /**
517      * setPeerAddress sets the address of a known peer at a later time.
518      * <p>
519      * This is to account for situations where a we know that a peer
520      * exists but we do not know an address up-front. This may also be used in
521      * situations where a known peer starts off in a different location and we
522      * need to change it's address
523      * <p>
524      * Note that if the peerId does not match the list of peers passed to
525      * this actor during construction an IllegalStateException will be thrown.
526      *
527      * @param peerId
528      * @param peerAddress
529      */
530     protected void setPeerAddress(String peerId, String peerAddress){
531         context.setPeerAddress(peerId, peerAddress);
532     }
533
534     protected void commitSnapshot(long sequenceNumber) {
535         context.getReplicatedLog().snapshotCommit();
536
537         // TODO: Not sure if we want to be this aggressive with trimming stuff
538         trimPersistentData(sequenceNumber);
539     }
540
541     /**
542      * The applyState method will be called by the RaftActor when some data
543      * needs to be applied to the actor's state
544      *
545      * @param clientActor A reference to the client who sent this message. This
546      *                    is the same reference that was passed to persistData
547      *                    by the derived actor. clientActor may be null when
548      *                    the RaftActor is behaving as a follower or during
549      *                    recovery.
550      * @param identifier  The identifier of the persisted data. This is also
551      *                    the same identifier that was passed to persistData by
552      *                    the derived actor. identifier may be null when
553      *                    the RaftActor is behaving as a follower or during
554      *                    recovery
555      * @param data        A piece of data that was persisted by the persistData call.
556      *                    This should NEVER be null.
557      */
558     protected abstract void applyState(ActorRef clientActor, String identifier,
559         Object data);
560
561     /**
562      * This method is called during recovery at the start of a batch of state entries. Derived
563      * classes should perform any initialization needed to start a batch.
564      */
565     protected abstract void startLogRecoveryBatch(int maxBatchSize);
566
567     /**
568      * This method is called during recovery to append state data to the current batch. This method
569      * is called 1 or more times after {@link #startLogRecoveryBatch}.
570      *
571      * @param data the state data
572      */
573     protected abstract void appendRecoveredLogEntry(Payload data);
574
575     /**
576      * This method is called during recovery to reconstruct the state of the actor.
577      *
578      * @param snapshot A snapshot of the state of the actor
579      */
580     protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
581
582     /**
583      * This method is called during recovery at the end of a batch to apply the current batched
584      * log entries. This method is called after {@link #appendRecoveredLogEntry}.
585      */
586     protected abstract void applyCurrentLogRecoveryBatch();
587
588     /**
589      * This method is called when recovery is complete.
590      */
591     protected abstract void onRecoveryComplete();
592
593     /**
594      * This method will be called by the RaftActor when a snapshot needs to be
595      * created. The derived actor should respond with its current state.
596      * <p/>
597      * During recovery the state that is returned by the derived actor will
598      * be passed back to it by calling the applySnapshot  method
599      *
600      * @return The current state of the actor
601      */
602     protected abstract void createSnapshot();
603
604     /**
605      * This method can be called at any other point during normal
606      * operations when the derived actor is out of sync with it's peers
607      * and the only way to bring it in sync is by applying a snapshot
608      *
609      * @param snapshotBytes A snapshot of the state of the actor
610      */
611     protected abstract void applySnapshot(byte[] snapshotBytes);
612
613     /**
614      * This method will be called by the RaftActor when the state of the
615      * RaftActor changes. The derived actor can then use methods like
616      * isLeader or getLeader to do something useful
617      */
618     protected abstract void onStateChanged();
619
620     protected abstract DataPersistenceProvider persistence();
621
622     /**
623      * Notifier Actor for this RaftActor to notify when a role change happens
624      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
625      */
626     protected abstract Optional<ActorRef> getRoleChangeNotifier();
627
628     protected void onLeaderChanged(String oldLeader, String newLeader){};
629
630     private void trimPersistentData(long sequenceNumber) {
631         // Trim akka snapshots
632         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
633         // For now guessing that it is ANDed.
634         persistence().deleteSnapshots(new SnapshotSelectionCriteria(
635             sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
636
637         // Trim akka journal
638         persistence().deleteMessages(sequenceNumber);
639     }
640
641     private String getLeaderAddress(){
642         if(isLeader()){
643             return getSelf().path().toString();
644         }
645         String leaderId = currentBehavior.getLeaderId();
646         if (leaderId == null) {
647             return null;
648         }
649         String peerAddress = context.getPeerAddress(leaderId);
650         if(LOG.isDebugEnabled()) {
651             LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
652                     persistenceId(), leaderId, peerAddress);
653         }
654
655         return peerAddress;
656     }
657
658     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
659         LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
660
661         // create a snapshot object from the state provided and save it
662         // when snapshot is saved async, SaveSnapshotSuccess is raised.
663
664         Snapshot sn = Snapshot.create(snapshotBytes,
665             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
666             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
667             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
668
669         persistence().saveSnapshot(sn);
670
671         LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
672
673         //be greedy and remove entries from in-mem journal which are in the snapshot
674         // and update snapshotIndex and snapshotTerm without waiting for the success,
675
676         context.getReplicatedLog().snapshotPreCommit(
677             captureSnapshot.getLastAppliedIndex(),
678             captureSnapshot.getLastAppliedTerm());
679
680         LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
681             "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
682             captureSnapshot.getLastAppliedTerm());
683
684         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
685             // this would be call straight to the leader and won't initiate in serialization
686             currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
687                     ByteString.copyFrom(snapshotBytes)));
688         }
689
690         captureSnapshot = null;
691         context.setSnapshotCaptureInitiated(false);
692     }
693
694     protected boolean hasFollowers(){
695         return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
696     }
697
698     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
699
700         private static final int DATA_SIZE_DIVIDER = 5;
701         private long dataSizeSinceLastSnapshot = 0;
702
703         public ReplicatedLogImpl(Snapshot snapshot) {
704             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
705                 snapshot.getUnAppliedEntries());
706         }
707
708         public ReplicatedLogImpl() {
709             super();
710         }
711
712         @Override public void removeFromAndPersist(long logEntryIndex) {
713             int adjustedIndex = adjustedIndex(logEntryIndex);
714
715             if (adjustedIndex < 0) {
716                 return;
717             }
718
719             // FIXME: Maybe this should be done after the command is saved
720             journal.subList(adjustedIndex , journal.size()).clear();
721
722             persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
723
724                 @Override public void apply(DeleteEntries param)
725                     throws Exception {
726                     //FIXME : Doing nothing for now
727                     dataSize = 0;
728                     for(ReplicatedLogEntry entry : journal){
729                         dataSize += entry.size();
730                     }
731                 }
732             });
733         }
734
735         @Override public void appendAndPersist(
736             final ReplicatedLogEntry replicatedLogEntry) {
737             appendAndPersist(replicatedLogEntry, null);
738         }
739
740         @Override
741         public int dataSize() {
742             return dataSize;
743         }
744
745         public void appendAndPersist(
746             final ReplicatedLogEntry replicatedLogEntry,
747             final Procedure<ReplicatedLogEntry> callback)  {
748
749             if(LOG.isDebugEnabled()) {
750                 LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
751             }
752
753             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
754             journal.add(replicatedLogEntry);
755
756             // When persisting events with persist it is guaranteed that the
757             // persistent actor will not receive further commands between the
758             // persist call and the execution(s) of the associated event
759             // handler. This also holds for multiple persist calls in context
760             // of a single command.
761             persistence().persist(replicatedLogEntry,
762                 new Procedure<ReplicatedLogEntry>() {
763                     @Override
764                     public void apply(ReplicatedLogEntry evt) throws Exception {
765                         int logEntrySize = replicatedLogEntry.size();
766
767                         dataSize += logEntrySize;
768                         long dataSizeForCheck = dataSize;
769
770                         dataSizeSinceLastSnapshot += logEntrySize;
771                         long journalSize = lastIndex()+1;
772
773                         if(!hasFollowers()) {
774                             // When we do not have followers we do not maintain an in-memory log
775                             // due to this the journalSize will never become anything close to the
776                             // snapshot batch count. In fact will mostly be 1.
777                             // Similarly since the journal's dataSize depends on the entries in the
778                             // journal the journal's dataSize will never reach a value close to the
779                             // memory threshold.
780                             // By maintaining the dataSize outside the journal we are tracking essentially
781                             // what we have written to the disk however since we no longer are in
782                             // need of doing a snapshot just for the sake of freeing up memory we adjust
783                             // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
784                             // as if we were maintaining a real snapshot
785                             dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
786                         }
787
788                         long dataThreshold = Runtime.getRuntime().totalMemory() *
789                                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
790
791                         // when a snaphsot is being taken, captureSnapshot != null
792                         if (!context.isSnapshotCaptureInitiated() &&
793                                 ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
794                                         dataSizeForCheck > dataThreshold)) {
795
796                             dataSizeSinceLastSnapshot = 0;
797
798                             LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
799                             long lastAppliedIndex = -1;
800                             long lastAppliedTerm = -1;
801
802                             ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
803                             if (!hasFollowers()) {
804                                 lastAppliedIndex = replicatedLogEntry.getIndex();
805                                 lastAppliedTerm = replicatedLogEntry.getTerm();
806                             } else if (lastAppliedEntry != null) {
807                                 lastAppliedIndex = lastAppliedEntry.getIndex();
808                                 lastAppliedTerm = lastAppliedEntry.getTerm();
809                             }
810
811                             if(LOG.isDebugEnabled()) {
812                                 LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
813                                 LOG.debug("{}: Snapshot Capture lastApplied:{} ",
814                                         persistenceId(), context.getLastApplied());
815                                 LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
816                                         lastAppliedIndex);
817                                 LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
818                                         lastAppliedTerm);
819                             }
820
821                             // send a CaptureSnapshot to self to make the expensive operation async.
822                             getSelf().tell(new CaptureSnapshot(
823                                 lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
824                                 null);
825                             context.setSnapshotCaptureInitiated(true);
826                         }
827                         if(callback != null){
828                             callback.apply(replicatedLogEntry);
829                         }
830                     }
831                 }
832             );
833         }
834
835     }
836
837     static class DeleteEntries implements Serializable {
838         private static final long serialVersionUID = 1L;
839         private final int fromIndex;
840
841         public DeleteEntries(int fromIndex) {
842             this.fromIndex = fromIndex;
843         }
844
845         public int getFromIndex() {
846             return fromIndex;
847         }
848     }
849
850
851     private class ElectionTermImpl implements ElectionTerm {
852         /**
853          * Identifier of the actor whose election term information this is
854          */
855         private long currentTerm = 0;
856         private String votedFor = null;
857
858         @Override
859         public long getCurrentTerm() {
860             return currentTerm;
861         }
862
863         @Override
864         public String getVotedFor() {
865             return votedFor;
866         }
867
868         @Override public void update(long currentTerm, String votedFor) {
869             if(LOG.isDebugEnabled()) {
870                 LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
871             }
872             this.currentTerm = currentTerm;
873             this.votedFor = votedFor;
874         }
875
876         @Override
877         public void updateAndPersist(long currentTerm, String votedFor){
878             update(currentTerm, votedFor);
879             // FIXME : Maybe first persist then update the state
880             persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
881
882                 @Override public void apply(UpdateElectionTerm param)
883                     throws Exception {
884
885                 }
886             });
887         }
888     }
889
890     static class UpdateElectionTerm implements Serializable {
891         private static final long serialVersionUID = 1L;
892         private final long currentTerm;
893         private final String votedFor;
894
895         public UpdateElectionTerm(long currentTerm, String votedFor) {
896             this.currentTerm = currentTerm;
897             this.votedFor = votedFor;
898         }
899
900         public long getCurrentTerm() {
901             return currentTerm;
902         }
903
904         public String getVotedFor() {
905             return votedFor;
906         }
907     }
908
909     protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
910
911         public NonPersistentRaftDataProvider(){
912
913         }
914
915         /**
916          * The way snapshotting works is,
917          * <ol>
918          * <li> RaftActor calls createSnapshot on the Shard
919          * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
920          * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
921          * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
922          * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
923          * in SaveSnapshotSuccess.
924          * </ol>
925          * @param o
926          */
927         @Override
928         public void saveSnapshot(Object o) {
929             // Make saving Snapshot successful
930             commitSnapshot(-1L);
931         }
932     }
933
934     @VisibleForTesting
935     void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
936         currentBehavior = behavior;
937     }
938
939     protected RaftActorBehavior getCurrentBehavior() {
940         return currentBehavior;
941     }
942
943 }