Dynamically update DatastoreContext properties
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.japi.Procedure;
14 import akka.persistence.RecoveryCompleted;
15 import akka.persistence.SaveSnapshotFailure;
16 import akka.persistence.SaveSnapshotSuccess;
17 import akka.persistence.SnapshotOffer;
18 import akka.persistence.SnapshotSelectionCriteria;
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Stopwatch;
22 import com.google.protobuf.ByteString;
23 import java.io.Serializable;
24 import java.util.Map;
25 import java.util.concurrent.TimeUnit;
26 import org.opendaylight.controller.cluster.DataPersistenceProvider;
27 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
28 import org.opendaylight.controller.cluster.notifications.RoleChanged;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
30 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
35 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
36 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
37 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
38 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
39 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
40 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
41 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * RaftActor encapsulates a state machine that needs to be kept synchronized
47  * in a cluster. It implements the RAFT algorithm as described in the paper
48  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
49  * In Search of an Understandable Consensus Algorithm</a>
50  * <p/>
51  * RaftActor has 3 states and each state has a certain behavior associated
52  * with it. A Raft actor can behave as,
53  * <ul>
54  * <li> A Leader </li>
55  * <li> A Follower (or) </li>
56  * <li> A Candidate </li>
57  * </ul>
58  * <p/>
59  * <p/>
60  * A RaftActor MUST be a Leader in order to accept requests from clients to
61  * change the state of it's encapsulated state machine. Once a RaftActor becomes
62  * a Leader it is also responsible for ensuring that all followers ultimately
63  * have the same log and therefore the same state machine as itself.
64  * <p/>
65  * <p/>
66  * The current behavior of a RaftActor determines how election for leadership
67  * is initiated and how peer RaftActors react to request for votes.
68  * <p/>
69  * <p/>
70  * Each RaftActor also needs to know the current election term. It uses this
71  * information for a couple of things. One is to simply figure out who it
72  * voted for in the last election. Another is to figure out if the message
73  * it received to update it's state is stale.
74  * <p/>
75  * <p/>
76  * The RaftActor uses akka-persistence to store it's replicated log.
77  * Furthermore through it's behaviors a Raft Actor determines
78  * <p/>
79  * <ul>
80  * <li> when a log entry should be persisted </li>
81  * <li> when a log entry should be applied to the state machine (and) </li>
82  * <li> when a snapshot should be saved </li>
83  * </ul>
84  */
85 public abstract class RaftActor extends AbstractUntypedPersistentActor {
86
87     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
88
89     protected final Logger LOG = LoggerFactory.getLogger(getClass());
90
91     /**
92      * The current state determines the current behavior of a RaftActor
93      * A Raft Actor always starts off in the Follower State
94      */
95     private RaftActorBehavior currentBehavior;
96
97     /**
98      * This context should NOT be passed directly to any other actor it is
99      * only to be consumed by the RaftActorBehaviors
100      */
101     private final RaftActorContextImpl context;
102
103     /**
104      * The in-memory journal
105      */
106     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
107
108     private CaptureSnapshot captureSnapshot = null;
109
110     private Stopwatch recoveryTimer;
111
112     private int currentRecoveryBatchCount;
113
114     public RaftActor(String id, Map<String, String> peerAddresses) {
115         this(id, peerAddresses, Optional.<ConfigParams>absent());
116     }
117
118     public RaftActor(String id, Map<String, String> peerAddresses,
119          Optional<ConfigParams> configParams) {
120
121         context = new RaftActorContextImpl(this.getSelf(),
122             this.getContext(), id, new ElectionTermImpl(),
123             -1, -1, replicatedLog, peerAddresses,
124             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
125             LOG);
126     }
127
128     private void initRecoveryTimer() {
129         if(recoveryTimer == null) {
130             recoveryTimer = Stopwatch.createStarted();
131         }
132     }
133
134     @Override
135     public void preStart() throws Exception {
136         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
137                 context.getConfigParams().getJournalRecoveryLogBatchSize());
138
139         super.preStart();
140     }
141
142     @Override
143     public void postStop() {
144         if(currentBehavior != null) {
145             try {
146                 currentBehavior.close();
147             } catch (Exception e) {
148                 LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
149             }
150         }
151
152         super.postStop();
153     }
154
155     @Override
156     public void handleRecover(Object message) {
157         if(persistence().isRecoveryApplicable()) {
158             if (message instanceof SnapshotOffer) {
159                 onRecoveredSnapshot((SnapshotOffer) message);
160             } else if (message instanceof ReplicatedLogEntry) {
161                 onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
162             } else if (message instanceof ApplyLogEntries) {
163                 onRecoveredApplyLogEntries((ApplyLogEntries) message);
164             } else if (message instanceof DeleteEntries) {
165                 replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
166             } else if (message instanceof UpdateElectionTerm) {
167                 context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
168                         ((UpdateElectionTerm) message).getVotedFor());
169             } else if (message instanceof RecoveryCompleted) {
170                 onRecoveryCompletedMessage();
171             }
172         } else {
173             if (message instanceof RecoveryCompleted) {
174                 // Delete all the messages from the akka journal so that we do not end up with consistency issues
175                 // Note I am not using the dataPersistenceProvider and directly using the akka api here
176                 deleteMessages(lastSequenceNr());
177
178                 // Delete all the akka snapshots as they will not be needed
179                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
180
181                 onRecoveryComplete();
182
183                 initializeBehavior();
184             }
185         }
186     }
187
188     private void onRecoveredSnapshot(SnapshotOffer offer) {
189         if(LOG.isDebugEnabled()) {
190             LOG.debug("{}: SnapshotOffer called..", persistenceId());
191         }
192
193         initRecoveryTimer();
194
195         Snapshot snapshot = (Snapshot) offer.snapshot();
196
197         // Create a replicated log with the snapshot information
198         // The replicated log can be used later on to retrieve this snapshot
199         // when we need to install it on a peer
200         replicatedLog = new ReplicatedLogImpl(snapshot);
201
202         context.setReplicatedLog(replicatedLog);
203         context.setLastApplied(snapshot.getLastAppliedIndex());
204         context.setCommitIndex(snapshot.getLastAppliedIndex());
205
206         Stopwatch timer = Stopwatch.createStarted();
207
208         // Apply the snapshot to the actors state
209         applyRecoverySnapshot(snapshot.getState());
210
211         timer.stop();
212         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
213                 replicatedLog.size(), persistenceId(), timer.toString(),
214                 replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
215     }
216
217     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
218         if(LOG.isDebugEnabled()) {
219             LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
220         }
221
222         replicatedLog.append(logEntry);
223     }
224
225     private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
226         if(LOG.isDebugEnabled()) {
227             LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
228                     persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
229         }
230
231         for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
232             batchRecoveredLogEntry(replicatedLog.get(i));
233         }
234
235         context.setLastApplied(ale.getToIndex());
236         context.setCommitIndex(ale.getToIndex());
237     }
238
239     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
240         initRecoveryTimer();
241
242         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
243         if(currentRecoveryBatchCount == 0) {
244             startLogRecoveryBatch(batchSize);
245         }
246
247         appendRecoveredLogEntry(logEntry.getData());
248
249         if(++currentRecoveryBatchCount >= batchSize) {
250             endCurrentLogRecoveryBatch();
251         }
252     }
253
254     private void endCurrentLogRecoveryBatch() {
255         applyCurrentLogRecoveryBatch();
256         currentRecoveryBatchCount = 0;
257     }
258
259     private void onRecoveryCompletedMessage() {
260         if(currentRecoveryBatchCount > 0) {
261             endCurrentLogRecoveryBatch();
262         }
263
264         onRecoveryComplete();
265
266         String recoveryTime = "";
267         if(recoveryTimer != null) {
268             recoveryTimer.stop();
269             recoveryTime = " in " + recoveryTimer.toString();
270             recoveryTimer = null;
271         }
272
273         LOG.info(
274             "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
275                 "Persistence Id =  " + persistenceId() +
276                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
277                 "journal-size={}",
278             replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
279             replicatedLog.getSnapshotTerm(), replicatedLog.size());
280
281         initializeBehavior();
282     }
283
284     protected void initializeBehavior(){
285         changeCurrentBehavior(new Follower(context));
286     }
287
288     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
289         RaftActorBehavior oldBehavior = currentBehavior;
290         currentBehavior = newBehavior;
291         handleBehaviorChange(oldBehavior, currentBehavior);
292     }
293
294     @Override public void handleCommand(Object message) {
295         if (message instanceof ApplyState){
296             ApplyState applyState = (ApplyState) message;
297
298             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
299             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
300                 LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
301                         TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
302             }
303
304             if(LOG.isDebugEnabled()) {
305                 LOG.debug("{}: Applying state for log index {} data {}",
306                     persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
307                     applyState.getReplicatedLogEntry().getData());
308             }
309
310             applyState(applyState.getClientActor(), applyState.getIdentifier(),
311                 applyState.getReplicatedLogEntry().getData());
312
313         } else if (message instanceof ApplyLogEntries){
314             ApplyLogEntries ale = (ApplyLogEntries) message;
315             if(LOG.isDebugEnabled()) {
316                 LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), ale.getToIndex());
317             }
318             persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
319                 @Override
320                 public void apply(ApplyLogEntries param) throws Exception {
321                 }
322             });
323
324         } else if(message instanceof ApplySnapshot ) {
325             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
326
327             if(LOG.isDebugEnabled()) {
328                 LOG.debug("{}: ApplySnapshot called on Follower Actor " +
329                         "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
330                     snapshot.getLastAppliedTerm()
331                 );
332             }
333
334             applySnapshot(snapshot.getState());
335
336             //clears the followers log, sets the snapshot index to ensure adjusted-index works
337             replicatedLog = new ReplicatedLogImpl(snapshot);
338             context.setReplicatedLog(replicatedLog);
339             context.setLastApplied(snapshot.getLastAppliedIndex());
340
341         } else if (message instanceof FindLeader) {
342             getSender().tell(
343                 new FindLeaderReply(getLeaderAddress()),
344                 getSelf()
345             );
346
347         } else if (message instanceof SaveSnapshotSuccess) {
348             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
349             LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
350
351             long sequenceNumber = success.metadata().sequenceNr();
352
353             commitSnapshot(sequenceNumber);
354
355         } else if (message instanceof SaveSnapshotFailure) {
356             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
357
358             LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
359                     persistenceId(), saveSnapshotFailure.cause());
360
361             context.getReplicatedLog().snapshotRollback();
362
363             LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
364                 "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
365                 context.getReplicatedLog().getSnapshotIndex(),
366                 context.getReplicatedLog().getSnapshotTerm(),
367                 context.getReplicatedLog().size());
368
369         } else if (message instanceof CaptureSnapshot) {
370             LOG.info("{}: CaptureSnapshot received by actor", persistenceId());
371
372             if(captureSnapshot == null) {
373                 captureSnapshot = (CaptureSnapshot)message;
374                 createSnapshot();
375             }
376
377         } else if (message instanceof CaptureSnapshotReply){
378             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
379
380         } else {
381             RaftActorBehavior oldBehavior = currentBehavior;
382             currentBehavior = currentBehavior.handleMessage(getSender(), message);
383
384             handleBehaviorChange(oldBehavior, currentBehavior);
385         }
386     }
387
388     private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
389         if (oldBehavior != currentBehavior){
390             onStateChanged();
391         }
392
393         String oldBehaviorLeaderId = oldBehavior == null? null : oldBehavior.getLeaderId();
394         String oldBehaviorState = oldBehavior == null? null : oldBehavior.state().name();
395
396         // it can happen that the state has not changed but the leader has changed.
397         onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
398
399         if (getRoleChangeNotifier().isPresent() &&
400                 (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
401             getRoleChangeNotifier().get().tell(
402                     new RoleChanged(getId(), oldBehaviorState , currentBehavior.state().name()),
403                     getSelf());
404         }
405     }
406
407     /**
408      * When a derived RaftActor needs to persist something it must call
409      * persistData.
410      *
411      * @param clientActor
412      * @param identifier
413      * @param data
414      */
415     protected void persistData(final ActorRef clientActor, final String identifier,
416         final Payload data) {
417
418         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
419             context.getReplicatedLog().lastIndex() + 1,
420             context.getTermInformation().getCurrentTerm(), data);
421
422         if(LOG.isDebugEnabled()) {
423             LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
424         }
425
426         final RaftActorContext raftContext = getRaftActorContext();
427
428         replicatedLog
429                 .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
430                     @Override
431                     public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
432                         if(!hasFollowers()){
433                             // Increment the Commit Index and the Last Applied values
434                             raftContext.setCommitIndex(replicatedLogEntry.getIndex());
435                             raftContext.setLastApplied(replicatedLogEntry.getIndex());
436
437                             // Apply the state immediately
438                             applyState(clientActor, identifier, data);
439
440                             // Send a ApplyLogEntries message so that we write the fact that we applied
441                             // the state to durable storage
442                             self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
443
444                             // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
445                             if(!context.isSnapshotCaptureInitiated()){
446                                 raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
447                                         raftContext.getTermInformation().getCurrentTerm());
448                                 raftContext.getReplicatedLog().snapshotCommit();
449                             } else {
450                                 LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
451                                         persistenceId(), getId());
452                             }
453                         } else if (clientActor != null) {
454                             // Send message for replication
455                             currentBehavior.handleMessage(getSelf(),
456                                     new Replicate(clientActor, identifier,
457                                             replicatedLogEntry)
458                             );
459                         }
460
461                     }
462                 });    }
463
464     protected String getId() {
465         return context.getId();
466     }
467
468     /**
469      * Derived actors can call the isLeader method to check if the current
470      * RaftActor is the Leader or not
471      *
472      * @return true it this RaftActor is a Leader false otherwise
473      */
474     protected boolean isLeader() {
475         return context.getId().equals(currentBehavior.getLeaderId());
476     }
477
478     /**
479      * Derived actor can call getLeader if they need a reference to the Leader.
480      * This would be useful for example in forwarding a request to an actor
481      * which is the leader
482      *
483      * @return A reference to the leader if known, null otherwise
484      */
485     protected ActorSelection getLeader(){
486         String leaderAddress = getLeaderAddress();
487
488         if(leaderAddress == null){
489             return null;
490         }
491
492         return context.actorSelection(leaderAddress);
493     }
494
495     /**
496      *
497      * @return the current leader's id
498      */
499     protected String getLeaderId(){
500         return currentBehavior.getLeaderId();
501     }
502
503     protected RaftState getRaftState() {
504         return currentBehavior.state();
505     }
506
507     protected ReplicatedLogEntry getLastLogEntry() {
508         return replicatedLog.last();
509     }
510
511     protected Long getCurrentTerm(){
512         return context.getTermInformation().getCurrentTerm();
513     }
514
515     protected Long getCommitIndex(){
516         return context.getCommitIndex();
517     }
518
519     protected Long getLastApplied(){
520         return context.getLastApplied();
521     }
522
523     protected RaftActorContext getRaftActorContext() {
524         return context;
525     }
526
527     protected void updateConfigParams(ConfigParams configParams) {
528         context.setConfigParams(configParams);
529     }
530
531     /**
532      * setPeerAddress sets the address of a known peer at a later time.
533      * <p>
534      * This is to account for situations where a we know that a peer
535      * exists but we do not know an address up-front. This may also be used in
536      * situations where a known peer starts off in a different location and we
537      * need to change it's address
538      * <p>
539      * Note that if the peerId does not match the list of peers passed to
540      * this actor during construction an IllegalStateException will be thrown.
541      *
542      * @param peerId
543      * @param peerAddress
544      */
545     protected void setPeerAddress(String peerId, String peerAddress){
546         context.setPeerAddress(peerId, peerAddress);
547     }
548
549     protected void commitSnapshot(long sequenceNumber) {
550         context.getReplicatedLog().snapshotCommit();
551
552         // TODO: Not sure if we want to be this aggressive with trimming stuff
553         trimPersistentData(sequenceNumber);
554     }
555
556     /**
557      * The applyState method will be called by the RaftActor when some data
558      * needs to be applied to the actor's state
559      *
560      * @param clientActor A reference to the client who sent this message. This
561      *                    is the same reference that was passed to persistData
562      *                    by the derived actor. clientActor may be null when
563      *                    the RaftActor is behaving as a follower or during
564      *                    recovery.
565      * @param identifier  The identifier of the persisted data. This is also
566      *                    the same identifier that was passed to persistData by
567      *                    the derived actor. identifier may be null when
568      *                    the RaftActor is behaving as a follower or during
569      *                    recovery
570      * @param data        A piece of data that was persisted by the persistData call.
571      *                    This should NEVER be null.
572      */
573     protected abstract void applyState(ActorRef clientActor, String identifier,
574         Object data);
575
576     /**
577      * This method is called during recovery at the start of a batch of state entries. Derived
578      * classes should perform any initialization needed to start a batch.
579      */
580     protected abstract void startLogRecoveryBatch(int maxBatchSize);
581
582     /**
583      * This method is called during recovery to append state data to the current batch. This method
584      * is called 1 or more times after {@link #startLogRecoveryBatch}.
585      *
586      * @param data the state data
587      */
588     protected abstract void appendRecoveredLogEntry(Payload data);
589
590     /**
591      * This method is called during recovery to reconstruct the state of the actor.
592      *
593      * @param snapshotBytes A snapshot of the state of the actor
594      */
595     protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
596
597     /**
598      * This method is called during recovery at the end of a batch to apply the current batched
599      * log entries. This method is called after {@link #appendRecoveredLogEntry}.
600      */
601     protected abstract void applyCurrentLogRecoveryBatch();
602
603     /**
604      * This method is called when recovery is complete.
605      */
606     protected abstract void onRecoveryComplete();
607
608     /**
609      * This method will be called by the RaftActor when a snapshot needs to be
610      * created. The derived actor should respond with its current state.
611      * <p/>
612      * During recovery the state that is returned by the derived actor will
613      * be passed back to it by calling the applySnapshot  method
614      *
615      * @return The current state of the actor
616      */
617     protected abstract void createSnapshot();
618
619     /**
620      * This method can be called at any other point during normal
621      * operations when the derived actor is out of sync with it's peers
622      * and the only way to bring it in sync is by applying a snapshot
623      *
624      * @param snapshotBytes A snapshot of the state of the actor
625      */
626     protected abstract void applySnapshot(byte[] snapshotBytes);
627
628     /**
629      * This method will be called by the RaftActor when the state of the
630      * RaftActor changes. The derived actor can then use methods like
631      * isLeader or getLeader to do something useful
632      */
633     protected abstract void onStateChanged();
634
635     protected abstract DataPersistenceProvider persistence();
636
637     /**
638      * Notifier Actor for this RaftActor to notify when a role change happens
639      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
640      */
641     protected abstract Optional<ActorRef> getRoleChangeNotifier();
642
643     protected void onLeaderChanged(String oldLeader, String newLeader){};
644
645     private void trimPersistentData(long sequenceNumber) {
646         // Trim akka snapshots
647         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
648         // For now guessing that it is ANDed.
649         persistence().deleteSnapshots(new SnapshotSelectionCriteria(
650             sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
651
652         // Trim akka journal
653         persistence().deleteMessages(sequenceNumber);
654     }
655
656     private String getLeaderAddress(){
657         if(isLeader()){
658             return getSelf().path().toString();
659         }
660         String leaderId = currentBehavior.getLeaderId();
661         if (leaderId == null) {
662             return null;
663         }
664         String peerAddress = context.getPeerAddress(leaderId);
665         if(LOG.isDebugEnabled()) {
666             LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
667                     persistenceId(), leaderId, peerAddress);
668         }
669
670         return peerAddress;
671     }
672
673     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
674         LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
675
676         // create a snapshot object from the state provided and save it
677         // when snapshot is saved async, SaveSnapshotSuccess is raised.
678
679         Snapshot sn = Snapshot.create(snapshotBytes,
680             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
681             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
682             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
683
684         persistence().saveSnapshot(sn);
685
686         LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
687
688         long dataThreshold = Runtime.getRuntime().totalMemory() *
689                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
690         if (context.getReplicatedLog().dataSize() > dataThreshold) {
691             // if memory is less, clear the log based on lastApplied.
692             // this could/should only happen if one of the followers is down
693             // as normally we keep removing from the log when its replicated to all.
694             context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
695                     captureSnapshot.getLastAppliedTerm());
696
697             getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
698         } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
699             // clear the log based on replicatedToAllIndex
700             context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
701                     captureSnapshot.getReplicatedToAllTerm());
702
703             getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
704         } else {
705             // The replicatedToAllIndex was not found in the log
706             // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
707             // In this scenario we may need to save the snapshot to the akka persistence
708             // snapshot for recovery but we do not need to do the replicated log trimming.
709             context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
710                     replicatedLog.getSnapshotTerm());
711         }
712
713
714         LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
715             "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
716             captureSnapshot.getLastAppliedTerm());
717
718         if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
719             // this would be call straight to the leader and won't initiate in serialization
720             currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
721                     ByteString.copyFrom(snapshotBytes)));
722         }
723
724         captureSnapshot = null;
725         context.setSnapshotCaptureInitiated(false);
726     }
727
728     protected boolean hasFollowers(){
729         return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
730     }
731
732     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
733
734         private static final int DATA_SIZE_DIVIDER = 5;
735         private long dataSizeSinceLastSnapshot = 0;
736
737         public ReplicatedLogImpl(Snapshot snapshot) {
738             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
739                 snapshot.getUnAppliedEntries());
740         }
741
742         public ReplicatedLogImpl() {
743             super();
744         }
745
746         @Override public void removeFromAndPersist(long logEntryIndex) {
747             int adjustedIndex = adjustedIndex(logEntryIndex);
748
749             if (adjustedIndex < 0) {
750                 return;
751             }
752
753             // FIXME: Maybe this should be done after the command is saved
754             journal.subList(adjustedIndex , journal.size()).clear();
755
756             persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
757
758                 @Override
759                 public void apply(DeleteEntries param)
760                         throws Exception {
761                     //FIXME : Doing nothing for now
762                     dataSize = 0;
763                     for (ReplicatedLogEntry entry : journal) {
764                         dataSize += entry.size();
765                     }
766                 }
767             });
768         }
769
770         @Override public void appendAndPersist(
771             final ReplicatedLogEntry replicatedLogEntry) {
772             appendAndPersist(replicatedLogEntry, null);
773         }
774
775         public void appendAndPersist(
776             final ReplicatedLogEntry replicatedLogEntry,
777             final Procedure<ReplicatedLogEntry> callback)  {
778
779             if(LOG.isDebugEnabled()) {
780                 LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
781             }
782
783             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
784             journal.add(replicatedLogEntry);
785
786             // When persisting events with persist it is guaranteed that the
787             // persistent actor will not receive further commands between the
788             // persist call and the execution(s) of the associated event
789             // handler. This also holds for multiple persist calls in context
790             // of a single command.
791             persistence().persist(replicatedLogEntry,
792                 new Procedure<ReplicatedLogEntry>() {
793                     @Override
794                     public void apply(ReplicatedLogEntry evt) throws Exception {
795                         int logEntrySize = replicatedLogEntry.size();
796
797                         dataSize += logEntrySize;
798                         long dataSizeForCheck = dataSize;
799
800                         dataSizeSinceLastSnapshot += logEntrySize;
801                         long journalSize = lastIndex() + 1;
802
803                         if(!hasFollowers()) {
804                             // When we do not have followers we do not maintain an in-memory log
805                             // due to this the journalSize will never become anything close to the
806                             // snapshot batch count. In fact will mostly be 1.
807                             // Similarly since the journal's dataSize depends on the entries in the
808                             // journal the journal's dataSize will never reach a value close to the
809                             // memory threshold.
810                             // By maintaining the dataSize outside the journal we are tracking essentially
811                             // what we have written to the disk however since we no longer are in
812                             // need of doing a snapshot just for the sake of freeing up memory we adjust
813                             // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
814                             // as if we were maintaining a real snapshot
815                             dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
816                         }
817
818                         long dataThreshold = Runtime.getRuntime().totalMemory() *
819                                 getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
820
821                         // when a snaphsot is being taken, captureSnapshot != null
822                         if (!context.isSnapshotCaptureInitiated() &&
823                                 ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
824                                         dataSizeForCheck > dataThreshold)) {
825
826                             dataSizeSinceLastSnapshot = 0;
827
828                             LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
829                                 " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
830
831                             long lastAppliedIndex = -1;
832                             long lastAppliedTerm = -1;
833
834                             ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
835                             if (!hasFollowers()) {
836                                 lastAppliedIndex = replicatedLogEntry.getIndex();
837                                 lastAppliedTerm = replicatedLogEntry.getTerm();
838                             } else if (lastAppliedEntry != null) {
839                                 lastAppliedIndex = lastAppliedEntry.getIndex();
840                                 lastAppliedTerm = lastAppliedEntry.getTerm();
841                             }
842
843                             if(LOG.isDebugEnabled()) {
844                                 LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
845                                 LOG.debug("{}: Snapshot Capture lastApplied:{} ",
846                                         persistenceId(), context.getLastApplied());
847                                 LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
848                                         lastAppliedIndex);
849                                 LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
850                                         lastAppliedTerm);
851                             }
852
853                             // send a CaptureSnapshot to self to make the expensive operation async.
854                             long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
855                             ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
856                             getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
857                                 (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
858                                 (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
859                                 null);
860                             context.setSnapshotCaptureInitiated(true);
861                         }
862                         if (callback != null){
863                             callback.apply(replicatedLogEntry);
864                         }
865                     }
866                 }
867             );
868         }
869
870     }
871
872     static class DeleteEntries implements Serializable {
873         private static final long serialVersionUID = 1L;
874         private final int fromIndex;
875
876         public DeleteEntries(int fromIndex) {
877             this.fromIndex = fromIndex;
878         }
879
880         public int getFromIndex() {
881             return fromIndex;
882         }
883     }
884
885
886     private class ElectionTermImpl implements ElectionTerm {
887         /**
888          * Identifier of the actor whose election term information this is
889          */
890         private long currentTerm = 0;
891         private String votedFor = null;
892
893         @Override
894         public long getCurrentTerm() {
895             return currentTerm;
896         }
897
898         @Override
899         public String getVotedFor() {
900             return votedFor;
901         }
902
903         @Override public void update(long currentTerm, String votedFor) {
904             if(LOG.isDebugEnabled()) {
905                 LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
906             }
907             this.currentTerm = currentTerm;
908             this.votedFor = votedFor;
909         }
910
911         @Override
912         public void updateAndPersist(long currentTerm, String votedFor){
913             update(currentTerm, votedFor);
914             // FIXME : Maybe first persist then update the state
915             persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
916
917                 @Override public void apply(UpdateElectionTerm param)
918                     throws Exception {
919
920                 }
921             });
922         }
923     }
924
925     static class UpdateElectionTerm implements Serializable {
926         private static final long serialVersionUID = 1L;
927         private final long currentTerm;
928         private final String votedFor;
929
930         public UpdateElectionTerm(long currentTerm, String votedFor) {
931             this.currentTerm = currentTerm;
932             this.votedFor = votedFor;
933         }
934
935         public long getCurrentTerm() {
936             return currentTerm;
937         }
938
939         public String getVotedFor() {
940             return votedFor;
941         }
942     }
943
944     protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
945
946         public NonPersistentRaftDataProvider(){
947
948         }
949
950         /**
951          * The way snapshotting works is,
952          * <ol>
953          * <li> RaftActor calls createSnapshot on the Shard
954          * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
955          * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
956          * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
957          * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
958          * in SaveSnapshotSuccess.
959          * </ol>
960          * @param o
961          */
962         @Override
963         public void saveSnapshot(Object o) {
964             // Make saving Snapshot successful
965             commitSnapshot(-1L);
966         }
967     }
968
969     @VisibleForTesting
970     void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
971         currentBehavior = behavior;
972     }
973
974     protected RaftActorBehavior getCurrentBehavior() {
975         return currentBehavior;
976     }
977
978 }