Merge "Startup arch - add odlparent to root pom."
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.japi.Procedure;
14 import akka.persistence.RecoveryCompleted;
15 import akka.persistence.SaveSnapshotFailure;
16 import akka.persistence.SaveSnapshotSuccess;
17 import akka.persistence.SnapshotOffer;
18 import akka.persistence.SnapshotSelectionCriteria;
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Stopwatch;
22 import com.google.protobuf.ByteString;
23 import java.io.Serializable;
24 import java.util.Map;
25 import java.util.concurrent.TimeUnit;
26 import org.opendaylight.controller.cluster.DataPersistenceProvider;
27 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
28 import org.opendaylight.controller.cluster.notifications.RoleChanged;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
37 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
38 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
39 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
40 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
41 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * RaftActor encapsulates a state machine that needs to be kept synchronized
47  * in a cluster. It implements the RAFT algorithm as described in the paper
48  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
49  * In Search of an Understandable Consensus Algorithm</a>
50  * <p/>
51  * RaftActor has 3 states and each state has a certain behavior associated
52  * with it. A Raft actor can behave as,
53  * <ul>
54  * <li> A Leader </li>
55  * <li> A Follower (or) </li>
56  * <li> A Candidate </li>
57  * </ul>
58  * <p/>
59  * <p/>
60  * A RaftActor MUST be a Leader in order to accept requests from clients to
61  * change the state of it's encapsulated state machine. Once a RaftActor becomes
62  * a Leader it is also responsible for ensuring that all followers ultimately
63  * have the same log and therefore the same state machine as itself.
64  * <p/>
65  * <p/>
66  * The current behavior of a RaftActor determines how election for leadership
67  * is initiated and how peer RaftActors react to request for votes.
68  * <p/>
69  * <p/>
70  * Each RaftActor also needs to know the current election term. It uses this
71  * information for a couple of things. One is to simply figure out who it
72  * voted for in the last election. Another is to figure out if the message
73  * it received to update it's state is stale.
74  * <p/>
75  * <p/>
76  * The RaftActor uses akka-persistence to store it's replicated log.
77  * Furthermore through it's behaviors a Raft Actor determines
78  * <p/>
79  * <ul>
80  * <li> when a log entry should be persisted </li>
81  * <li> when a log entry should be applied to the state machine (and) </li>
82  * <li> when a snapshot should be saved </li>
83  * </ul>
84  */
85 public abstract class RaftActor extends AbstractUntypedPersistentActor {
86
87     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
88
89     protected final Logger LOG = LoggerFactory.getLogger(getClass());
90
91     /**
92      * The current state determines the current behavior of a RaftActor
93      * A Raft Actor always starts off in the Follower State
94      */
95     private RaftActorBehavior currentBehavior;
96
97     /**
98      * This context should NOT be passed directly to any other actor it is
99      * only to be consumed by the RaftActorBehaviors
100      */
101     private final RaftActorContext context;
102
103     /**
104      * The in-memory journal
105      */
106     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
107
108     private CaptureSnapshot captureSnapshot = null;
109
110     private Stopwatch recoveryTimer;
111
112     private int currentRecoveryBatchCount;
113
114     public RaftActor(String id, Map<String, String> peerAddresses) {
115         this(id, peerAddresses, Optional.<ConfigParams>absent());
116     }
117
118     public RaftActor(String id, Map<String, String> peerAddresses,
119          Optional<ConfigParams> configParams) {
120
121         context = new RaftActorContextImpl(this.getSelf(),
122             this.getContext(), id, new ElectionTermImpl(),
123             -1, -1, replicatedLog, peerAddresses,
124             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
125             LOG);
126     }
127
128     private void initRecoveryTimer() {
129         if(recoveryTimer == null) {
130             recoveryTimer = Stopwatch.createStarted();
131         }
132     }
133
134     @Override
135     public void preStart() throws Exception {
136         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
137                 context.getConfigParams().getJournalRecoveryLogBatchSize());
138
139         super.preStart();
140     }
141
142     @Override
143     public void handleRecover(Object message) {
144         if(persistence().isRecoveryApplicable()) {
145             if (message instanceof SnapshotOffer) {
146                 onRecoveredSnapshot((SnapshotOffer) message);
147             } else if (message instanceof ReplicatedLogEntry) {
148                 onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
149             } else if (message instanceof ApplyLogEntries) {
150                 onRecoveredApplyLogEntries((ApplyLogEntries) message);
151             } else if (message instanceof DeleteEntries) {
152                 replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
153             } else if (message instanceof UpdateElectionTerm) {
154                 context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
155                         ((UpdateElectionTerm) message).getVotedFor());
156             } else if (message instanceof RecoveryCompleted) {
157                 onRecoveryCompletedMessage();
158             }
159         } else {
160             if (message instanceof RecoveryCompleted) {
161                 // Delete all the messages from the akka journal so that we do not end up with consistency issues
162                 // Note I am not using the dataPersistenceProvider and directly using the akka api here
163                 deleteMessages(lastSequenceNr());
164
165                 // Delete all the akka snapshots as they will not be needed
166                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
167
168                 onRecoveryComplete();
169
170                 initializeBehavior();
171             }
172         }
173     }
174
175     private void onRecoveredSnapshot(SnapshotOffer offer) {
176         if(LOG.isDebugEnabled()) {
177             LOG.debug("{}: SnapshotOffer called..", persistenceId());
178         }
179
180         initRecoveryTimer();
181
182         Snapshot snapshot = (Snapshot) offer.snapshot();
183
184         // Create a replicated log with the snapshot information
185         // The replicated log can be used later on to retrieve this snapshot
186         // when we need to install it on a peer
187         replicatedLog = new ReplicatedLogImpl(snapshot);
188
189         context.setReplicatedLog(replicatedLog);
190         context.setLastApplied(snapshot.getLastAppliedIndex());
191         context.setCommitIndex(snapshot.getLastAppliedIndex());
192
193         Stopwatch timer = Stopwatch.createStarted();
194
195         // Apply the snapshot to the actors state
196         applyRecoverySnapshot(snapshot.getState());
197
198         timer.stop();
199         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
200                 replicatedLog.size(), persistenceId(), timer.toString(),
201                 replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
202     }
203
204     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
205         if(LOG.isDebugEnabled()) {
206             LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
207         }
208
209         replicatedLog.append(logEntry);
210     }
211
212     private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
213         if(LOG.isDebugEnabled()) {
214             LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
215                     persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
216         }
217
218         for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
219             batchRecoveredLogEntry(replicatedLog.get(i));
220         }
221
222         context.setLastApplied(ale.getToIndex());
223         context.setCommitIndex(ale.getToIndex());
224     }
225
226     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
227         initRecoveryTimer();
228
229         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
230         if(currentRecoveryBatchCount == 0) {
231             startLogRecoveryBatch(batchSize);
232         }
233
234         appendRecoveredLogEntry(logEntry.getData());
235
236         if(++currentRecoveryBatchCount >= batchSize) {
237             endCurrentLogRecoveryBatch();
238         }
239     }
240
241     private void endCurrentLogRecoveryBatch() {
242         applyCurrentLogRecoveryBatch();
243         currentRecoveryBatchCount = 0;
244     }
245
246     private void onRecoveryCompletedMessage() {
247         if(currentRecoveryBatchCount > 0) {
248             endCurrentLogRecoveryBatch();
249         }
250
251         onRecoveryComplete();
252
253         String recoveryTime = "";
254         if(recoveryTimer != null) {
255             recoveryTimer.stop();
256             recoveryTime = " in " + recoveryTimer.toString();
257             recoveryTimer = null;
258         }
259
260         LOG.info(
261             "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
262                 "Persistence Id =  " + persistenceId() +
263                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
264                 "journal-size={}",
265             replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
266             replicatedLog.getSnapshotTerm(), replicatedLog.size());
267
268         initializeBehavior();
269     }
270
271     protected void initializeBehavior(){
272         changeCurrentBehavior(new Follower(context));
273     }
274
275     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
276         RaftActorBehavior oldBehavior = currentBehavior;
277         currentBehavior = newBehavior;
278         handleBehaviorChange(oldBehavior, currentBehavior);
279     }
280
281     @Override public void handleCommand(Object message) {
282         if (message instanceof ApplyState){
283             ApplyState applyState = (ApplyState) message;
284
285             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
286             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
287                 LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
288                         TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
289             }
290
291             if(LOG.isDebugEnabled()) {
292                 LOG.debug("{}: Applying state for log index {} data {}",
293                     persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
294                     applyState.getReplicatedLogEntry().getData());
295             }
296
297             applyState(applyState.getClientActor(), applyState.getIdentifier(),
298                 applyState.getReplicatedLogEntry().getData());
299
300         } else if (message instanceof ApplyLogEntries){
301             ApplyLogEntries ale = (ApplyLogEntries) message;
302             if(LOG.isDebugEnabled()) {
303                 LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
304             }
305             persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
306                 @Override
307                 public void apply(ApplyLogEntries param) throws Exception {
308                 }
309             });
310
311         } else if(message instanceof ApplySnapshot ) {
312             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
313
314             if(LOG.isDebugEnabled()) {
315                 LOG.debug("{}: ApplySnapshot called on Follower Actor " +
316                         "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
317                     snapshot.getLastAppliedTerm()
318                 );
319             }
320
321             applySnapshot(snapshot.getState());
322
323             //clears the followers log, sets the snapshot index to ensure adjusted-index works
324             replicatedLog = new ReplicatedLogImpl(snapshot);
325             context.setReplicatedLog(replicatedLog);
326             context.setLastApplied(snapshot.getLastAppliedIndex());
327
328         } else if (message instanceof FindLeader) {
329             getSender().tell(
330                 new FindLeaderReply(getLeaderAddress()),
331                 getSelf()
332             );
333
334         } else if (message instanceof SaveSnapshotSuccess) {
335             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
336             LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
337
338             long sequenceNumber = success.metadata().sequenceNr();
339
340             commitSnapshot(sequenceNumber);
341
342         } else if (message instanceof SaveSnapshotFailure) {
343             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
344
345             LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
346                     persistenceId(), saveSnapshotFailure.cause());
347
348             context.getReplicatedLog().snapshotRollback();
349
350             LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
351                 "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
352                 context.getReplicatedLog().getSnapshotIndex(),
353                 context.getReplicatedLog().getSnapshotTerm(),
354                 context.getReplicatedLog().size());
355
356         } else if (message instanceof CaptureSnapshot) {
357             LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
358
359             if(captureSnapshot == null) {
360                 captureSnapshot = (CaptureSnapshot)message;
361                 createSnapshot();
362             }
363
364         } else if (message instanceof CaptureSnapshotReply){
365             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
366
367         } else {
368             RaftActorBehavior oldBehavior = currentBehavior;
369             currentBehavior = currentBehavior.handleMessage(getSender(), message);
370
371             handleBehaviorChange(oldBehavior, currentBehavior);
372         }
373     }
374
375     private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
376         if (oldBehavior != currentBehavior){
377             onStateChanged();
378         }
379
380         String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
381         String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
382
383         // it can happen that the state has not changed but the leader has changed.
384         onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
385
386         if (getRoleChangeNotifier().isPresent() &&
387                 (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
388             getRoleChangeNotifier().get().tell(
389                     new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
390                     getSelf());
391         }
392     }
393
394     /**
395      * When a derived RaftActor needs to persist something it must call
396      * persistData.
397      *
398      * @param clientActor
399      * @param identifier
400      * @param data
401      */
402     protected void persistData(final ActorRef clientActor, final String identifier,
403         final Payload data) {
404
405         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
406             context.getReplicatedLog().lastIndex() + 1,
407             context.getTermInformation().getCurrentTerm(), data);
408
409         if(LOG.isDebugEnabled()) {
410             LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
411         }
412
413         final RaftActorContext raftContext = getRaftActorContext();
414
415         replicatedLog
416                 .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
417                     @Override
418                     public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
419                         if(!hasFollowers()){
420                             // Increment the Commit Index and the Last Applied values
421                             raftContext.setCommitIndex(replicatedLogEntry.getIndex());
422                             raftContext.setLastApplied(replicatedLogEntry.getIndex());
423
424                             // Apply the state immediately
425                             applyState(clientActor, identifier, data);
426
427                             // Send a ApplyLogEntries message so that we write the fact that we applied
428                             // the state to durable storage
429                             self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
430
431                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
432                             if(!context.isSnapshotCaptureInitiated()){
433                                 raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
434                                         raftContext.getTermInformation().getCurrentTerm());
435                                 raftContext.getReplicatedLog().snapshotCommit();
436                             } else {
437                                 LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
438                                         persistenceId(), getId());
439                             }
440                         } else if (clientActor != null) {
441                             // Send message for replication
442                             currentBehavior.handleMessage(getSelf(),
443                                     new Replicate(clientActor, identifier,
444                                             replicatedLogEntry)
445                             );
446                         }
447
448                     }
449                 });    }
450
451     protected String getId() {
452         return context.getId();
453     }
454
455     /**
456      * Derived actors can call the isLeader method to check if the current
457      * RaftActor is the Leader or not
458      *
459      * @return true it this RaftActor is a Leader false otherwise
460      */
461     protected boolean isLeader() {
462         return context.getId().equals(currentBehavior.getLeaderId());
463     }
464
465     /**
466      * Derived actor can call getLeader if they need a reference to the Leader.
467      * This would be useful for example in forwarding a request to an actor
468      * which is the leader
469      *
470      * @return A reference to the leader if known, null otherwise
471      */
472     protected ActorSelection getLeader(){
473         String leaderAddress = getLeaderAddress();
474
475         if(leaderAddress == null){
476             return null;
477         }
478
479         return context.actorSelection(leaderAddress);
480     }
481
482     /**
483      *
484      * @return the current leader's id
485      */
486     protected String getLeaderId(){
487         return currentBehavior.getLeaderId();
488     }
489
490     protected RaftState getRaftState() {
491         return currentBehavior.state();
492     }
493
494     protected ReplicatedLogEntry getLastLogEntry() {
495         return replicatedLog.last();
496     }
497
498     protected Long getCurrentTerm(){
499         return context.getTermInformation().getCurrentTerm();
500     }
501
502     protected Long getCommitIndex(){
503         return context.getCommitIndex();
504     }
505
506     protected Long getLastApplied(){
507         return context.getLastApplied();
508     }
509
510     protected RaftActorContext getRaftActorContext() {
511         return context;
512     }
513
514     /**
515      * setPeerAddress sets the address of a known peer at a later time.
516      * <p>
517      * This is to account for situations where a we know that a peer
518      * exists but we do not know an address up-front. This may also be used in
519      * situations where a known peer starts off in a different location and we
520      * need to change it's address
521      * <p>
522      * Note that if the peerId does not match the list of peers passed to
523      * this actor during construction an IllegalStateException will be thrown.
524      *
525      * @param peerId
526      * @param peerAddress
527      */
528     protected void setPeerAddress(String peerId, String peerAddress){
529         context.setPeerAddress(peerId, peerAddress);
530     }
531
532     protected void commitSnapshot(long sequenceNumber) {
533         context.getReplicatedLog().snapshotCommit();
534
535         // TODO: Not sure if we want to be this aggressive with trimming stuff
536         trimPersistentData(sequenceNumber);
537     }
538
539     /**
540      * The applyState method will be called by the RaftActor when some data
541      * needs to be applied to the actor's state
542      *
543      * @param clientActor A reference to the client who sent this message. This
544      *                    is the same reference that was passed to persistData
545      *                    by the derived actor. clientActor may be null when
546      *                    the RaftActor is behaving as a follower or during
547      *                    recovery.
548      * @param identifier  The identifier of the persisted data. This is also
549      *                    the same identifier that was passed to persistData by
550      *                    the derived actor. identifier may be null when
551      *                    the RaftActor is behaving as a follower or during
552      *                    recovery
553      * @param data        A piece of data that was persisted by the persistData call.
554      *                    This should NEVER be null.
555      */
556     protected abstract void applyState(ActorRef clientActor, String identifier,
557         Object data);
558
559     /**
560      * This method is called during recovery at the start of a batch of state entries. Derived
561      * classes should perform any initialization needed to start a batch.
562      */
563     protected abstract void startLogRecoveryBatch(int maxBatchSize);
564
565     /**
566      * This method is called during recovery to append state data to the current batch. This method
567      * is called 1 or more times after {@link #startLogRecoveryBatch}.
568      *
569      * @param data the state data
570      */
571     protected abstract void appendRecoveredLogEntry(Payload data);
572
573     /**
574      * This method is called during recovery to reconstruct the state of the actor.
575      *
576      * @param snapshotBytes A snapshot of the state of the actor
577      */
578     protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
579
580     /**
581      * This method is called during recovery at the end of a batch to apply the current batched
582      * log entries. This method is called after {@link #appendRecoveredLogEntry}.
583      */
584     protected abstract void applyCurrentLogRecoveryBatch();
585
586     /**
587      * This method is called when recovery is complete.
588      */
589     protected abstract void onRecoveryComplete();
590
591     /**
592      * This method will be called by the RaftActor when a snapshot needs to be
593      * created. The derived actor should respond with its current state.
594      * <p/>
595      * During recovery the state that is returned by the derived actor will
596      * be passed back to it by calling the applySnapshot  method
597      *
598      * @return The current state of the actor
599      */
600     protected abstract void createSnapshot();
601
602     /**
603      * This method can be called at any other point during normal
604      * operations when the derived actor is out of sync with it's peers
605      * and the only way to bring it in sync is by applying a snapshot
606      *
607      * @param snapshotBytes A snapshot of the state of the actor
608      */
609     protected abstract void applySnapshot(byte[] snapshotBytes);
610
611     /**
612      * This method will be called by the RaftActor when the state of the
613      * RaftActor changes. The derived actor can then use methods like
614      * isLeader or getLeader to do something useful
615      */
616     protected abstract void onStateChanged();
617
618     protected abstract DataPersistenceProvider persistence();
619
620     /**
621      * Notifier Actor for this RaftActor to notify when a role change happens
622      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
623      */
624     protected abstract Optional<ActorRef> getRoleChangeNotifier();
625
626     protected void onLeaderChanged(String oldLeader, String newLeader){};
627
628     private void trimPersistentData(long sequenceNumber) {
629         // Trim akka snapshots
630         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
631         // For now guessing that it is ANDed.
632         persistence().deleteSnapshots(new SnapshotSelectionCriteria(
633             sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
634
635         // Trim akka journal
636         persistence().deleteMessages(sequenceNumber);
637     }
638
639     private String getLeaderAddress(){
640         if(isLeader()){
641             return getSelf().path().toString();
642         }
643         String leaderId = currentBehavior.getLeaderId();
644         if (leaderId == null) {
645             return null;
646         }
647         String peerAddress = context.getPeerAddress(leaderId);
648         if(LOG.isDebugEnabled()) {
649             LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
650                     persistenceId(), leaderId, peerAddress);
651         }
652
653         return peerAddress;
654     }
655
656     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
657         LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
658
659         // create a snapshot object from the state provided and save it
660         // when snapshot is saved async, SaveSnapshotSuccess is raised.
661
662         Snapshot sn = Snapshot.create(snapshotBytes,
663             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
664             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
665             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
666
667         persistence().saveSnapshot(sn);
668
669         LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
670
671         long dataThreshold = Runtime.getRuntime().totalMemory() *
672                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
673         if (context.getReplicatedLog().dataSize() > dataThreshold) {
674             // if memory is less, clear the log based on lastApplied.
675             // this could/should only happen if one of the followers is down
676             // as normally we keep removing from the log when its replicated to all.
677             context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
678                     captureSnapshot.getLastAppliedTerm());
679
680             getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
681         } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
682             // clear the log based on replicatedToAllIndex
683             context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
684                     captureSnapshot.getReplicatedToAllTerm());
685
686             getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
687         } else {
688             // The replicatedToAllIndex was not found in the log
689             // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
690             // In this scenario we may need to save the snapshot to the akka persistence
691             // snapshot for recovery but we do not need to do the replicated log trimming.
692             context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
693                     replicatedLog.getSnapshotTerm());
694         }
695
696
697         LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
698             "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
699             captureSnapshot.getLastAppliedTerm());
700
701         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
702             // this would be call straight to the leader and won't initiate in serialization
703             currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
704                     ByteString.copyFrom(snapshotBytes)));
705         }
706
707         captureSnapshot = null;
708         context.setSnapshotCaptureInitiated(false);
709     }
710
711     protected boolean hasFollowers(){
712         return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
713     }
714
715     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
716
717         private static final int DATA_SIZE_DIVIDER = 5;
718         private long dataSizeSinceLastSnapshot = 0;
719
720         public ReplicatedLogImpl(Snapshot snapshot) {
721             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
722                 snapshot.getUnAppliedEntries());
723         }
724
725         public ReplicatedLogImpl() {
726             super();
727         }
728
729         @Override public void removeFromAndPersist(long logEntryIndex) {
730             int adjustedIndex = adjustedIndex(logEntryIndex);
731
732             if (adjustedIndex < 0) {
733                 return;
734             }
735
736             // FIXME: Maybe this should be done after the command is saved
737             journal.subList(adjustedIndex , journal.size()).clear();
738
739             persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
740
741                 @Override
742                 public void apply(DeleteEntries param)
743                         throws Exception {
744                     //FIXME : Doing nothing for now
745                     dataSize = 0;
746                     for (ReplicatedLogEntry entry : journal) {
747                         dataSize += entry.size();
748                     }
749                 }
750             });
751         }
752
753         @Override public void appendAndPersist(
754             final ReplicatedLogEntry replicatedLogEntry) {
755             appendAndPersist(replicatedLogEntry, null);
756         }
757
758         public void appendAndPersist(
759             final ReplicatedLogEntry replicatedLogEntry,
760             final Procedure<ReplicatedLogEntry> callback)  {
761
762             if(LOG.isDebugEnabled()) {
763                 LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
764             }
765
766             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
767             journal.add(replicatedLogEntry);
768
769             // When persisting events with persist it is guaranteed that the
770             // persistent actor will not receive further commands between the
771             // persist call and the execution(s) of the associated event
772             // handler. This also holds for multiple persist calls in context
773             // of a single command.
774             persistence().persist(replicatedLogEntry,
775                 new Procedure<ReplicatedLogEntry>() {
776                     @Override
777                     public void apply(ReplicatedLogEntry evt) throws Exception {
778                         int logEntrySize = replicatedLogEntry.size();
779
780                         dataSize += logEntrySize;
781                         long dataSizeForCheck = dataSize;
782
783                         dataSizeSinceLastSnapshot += logEntrySize;
784                         long journalSize = lastIndex() + 1;
785
786                         if(!hasFollowers()) {
787                             // When we do not have followers we do not maintain an in-memory log
788                             // due to this the journalSize will never become anything close to the
789                             // snapshot batch count. In fact will mostly be 1.
790                             // Similarly since the journal's dataSize depends on the entries in the
791                             // journal the journal's dataSize will never reach a value close to the
792                             // memory threshold.
793                             // By maintaining the dataSize outside the journal we are tracking essentially
794                             // what we have written to the disk however since we no longer are in
795                             // need of doing a snapshot just for the sake of freeing up memory we adjust
796                             // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
797                             // as if we were maintaining a real snapshot
798                             dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
799                         }
800
801                         long dataThreshold = Runtime.getRuntime().totalMemory() *
802                                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
803
804                         // when a snaphsot is being taken, captureSnapshot != null
805                         if (!context.isSnapshotCaptureInitiated() &&
806                                 ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
807                                         dataSizeForCheck > dataThreshold)) {
808
809                             dataSizeSinceLastSnapshot = 0;
810
811                             LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
812                                 " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
813
814                             long lastAppliedIndex = -1;
815                             long lastAppliedTerm = -1;
816
817                             ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
818                             if (!hasFollowers()) {
819                                 lastAppliedIndex = replicatedLogEntry.getIndex();
820                                 lastAppliedTerm = replicatedLogEntry.getTerm();
821                             } else if (lastAppliedEntry != null) {
822                                 lastAppliedIndex = lastAppliedEntry.getIndex();
823                                 lastAppliedTerm = lastAppliedEntry.getTerm();
824                             }
825
826                             if(LOG.isDebugEnabled()) {
827                                 LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
828                                 LOG.debug("{}: Snapshot Capture lastApplied:{} ",
829                                         persistenceId(), context.getLastApplied());
830                                 LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
831                                         lastAppliedIndex);
832                                 LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
833                                         lastAppliedTerm);
834                             }
835
836                             // send a CaptureSnapshot to self to make the expensive operation async.
837                             long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
838                             ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
839                             getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
840                                 (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
841                                 (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
842                                 null);
843                             context.setSnapshotCaptureInitiated(true);
844                         }
845                         if (callback != null){
846                             callback.apply(replicatedLogEntry);
847                         }
848                     }
849                 }
850             );
851         }
852
853     }
854
855     static class DeleteEntries implements Serializable {
856         private static final long serialVersionUID = 1L;
857         private final int fromIndex;
858
859         public DeleteEntries(int fromIndex) {
860             this.fromIndex = fromIndex;
861         }
862
863         public int getFromIndex() {
864             return fromIndex;
865         }
866     }
867
868
869     private class ElectionTermImpl implements ElectionTerm {
870         /**
871          * Identifier of the actor whose election term information this is
872          */
873         private long currentTerm = 0;
874         private String votedFor = null;
875
876         @Override
877         public long getCurrentTerm() {
878             return currentTerm;
879         }
880
881         @Override
882         public String getVotedFor() {
883             return votedFor;
884         }
885
886         @Override public void update(long currentTerm, String votedFor) {
887             if(LOG.isDebugEnabled()) {
888                 LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
889             }
890             this.currentTerm = currentTerm;
891             this.votedFor = votedFor;
892         }
893
894         @Override
895         public void updateAndPersist(long currentTerm, String votedFor){
896             update(currentTerm, votedFor);
897             // FIXME : Maybe first persist then update the state
898             persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
899
900                 @Override public void apply(UpdateElectionTerm param)
901                     throws Exception {
902
903                 }
904             });
905         }
906     }
907
908     static class UpdateElectionTerm implements Serializable {
909         private static final long serialVersionUID = 1L;
910         private final long currentTerm;
911         private final String votedFor;
912
913         public UpdateElectionTerm(long currentTerm, String votedFor) {
914             this.currentTerm = currentTerm;
915             this.votedFor = votedFor;
916         }
917
918         public long getCurrentTerm() {
919             return currentTerm;
920         }
921
922         public String getVotedFor() {
923             return votedFor;
924         }
925     }
926
927     protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
928
929         public NonPersistentRaftDataProvider(){
930
931         }
932
933         /**
934          * The way snapshotting works is,
935          * <ol>
936          * <li> RaftActor calls createSnapshot on the Shard
937          * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
938          * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
939          * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
940          * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
941          * in SaveSnapshotSuccess.
942          * </ol>
943          * @param o
944          */
945         @Override
946         public void saveSnapshot(Object o) {
947             // Make saving Snapshot successful
948             commitSnapshot(-1L);
949         }
950     }
951
952     @VisibleForTesting
953     void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
954         currentBehavior = behavior;
955     }
956
957     protected RaftActorBehavior getCurrentBehavior() {
958         return currentBehavior;
959     }
960
961 }