Refactor ElectionTermImpl into separate class
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.japi.Procedure;
14 import akka.persistence.RecoveryCompleted;
15 import akka.persistence.SaveSnapshotFailure;
16 import akka.persistence.SaveSnapshotSuccess;
17 import akka.persistence.SnapshotOffer;
18 import akka.persistence.SnapshotSelectionCriteria;
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Objects;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Stopwatch;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.collect.Lists;
25 import java.io.Serializable;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang3.time.DurationFormatUtils;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
33 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
34 import org.opendaylight.controller.cluster.PersistentDataProvider;
35 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
36 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
37 import org.opendaylight.controller.cluster.notifications.RoleChanged;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
41 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
42 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
43 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
44 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
45 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
46 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
47 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
48 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
49 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
50 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
51 import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
52 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
53 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
54 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * RaftActor encapsulates a state machine that needs to be kept synchronized
60  * in a cluster. It implements the RAFT algorithm as described in the paper
61  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
62  * In Search of an Understandable Consensus Algorithm</a>
63  * <p/>
64  * RaftActor has 3 states and each state has a certain behavior associated
65  * with it. A Raft actor can behave as,
66  * <ul>
67  * <li> A Leader </li>
68  * <li> A Follower (or) </li>
69  * <li> A Candidate </li>
70  * </ul>
71  * <p/>
72  * <p/>
73  * A RaftActor MUST be a Leader in order to accept requests from clients to
74  * change the state of it's encapsulated state machine. Once a RaftActor becomes
75  * a Leader it is also responsible for ensuring that all followers ultimately
76  * have the same log and therefore the same state machine as itself.
77  * <p/>
78  * <p/>
79  * The current behavior of a RaftActor determines how election for leadership
80  * is initiated and how peer RaftActors react to request for votes.
81  * <p/>
82  * <p/>
83  * Each RaftActor also needs to know the current election term. It uses this
84  * information for a couple of things. One is to simply figure out who it
85  * voted for in the last election. Another is to figure out if the message
86  * it received to update it's state is stale.
87  * <p/>
88  * <p/>
89  * The RaftActor uses akka-persistence to store it's replicated log.
90  * Furthermore through it's behaviors a Raft Actor determines
91  * <p/>
92  * <ul>
93  * <li> when a log entry should be persisted </li>
94  * <li> when a log entry should be applied to the state machine (and) </li>
95  * <li> when a snapshot should be saved </li>
96  * </ul>
97  */
98 public abstract class RaftActor extends AbstractUntypedPersistentActor {
99
100     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
101
102     private static final String COMMIT_SNAPSHOT = "commit_snapshot";
103
104     protected final Logger LOG = LoggerFactory.getLogger(getClass());
105
106     /**
107      * The current state determines the current behavior of a RaftActor
108      * A Raft Actor always starts off in the Follower State
109      */
110     private RaftActorBehavior currentBehavior;
111
112     /**
113      * This context should NOT be passed directly to any other actor it is
114      * only to be consumed by the RaftActorBehaviors
115      */
116     private final RaftActorContextImpl context;
117
118     private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
119
120     private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
121
122     /**
123      * The in-memory journal
124      */
125     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
126
127     private Stopwatch recoveryTimer;
128
129     private int currentRecoveryBatchCount;
130
131     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
132
133     public RaftActor(String id, Map<String, String> peerAddresses) {
134         this(id, peerAddresses, Optional.<ConfigParams>absent());
135     }
136
137     public RaftActor(String id, Map<String, String> peerAddresses,
138          Optional<ConfigParams> configParams) {
139
140         context = new RaftActorContextImpl(this.getSelf(),
141             this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
142             -1, -1, replicatedLog, peerAddresses,
143             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
144             LOG);
145     }
146
147     private void initRecoveryTimer() {
148         if(recoveryTimer == null) {
149             recoveryTimer = Stopwatch.createStarted();
150         }
151     }
152
153     @Override
154     public void preStart() throws Exception {
155         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
156                 context.getConfigParams().getJournalRecoveryLogBatchSize());
157
158         super.preStart();
159     }
160
161     @Override
162     public void postStop() {
163         if(currentBehavior != null) {
164             try {
165                 currentBehavior.close();
166             } catch (Exception e) {
167                 LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
168             }
169         }
170
171         super.postStop();
172     }
173
174     @Override
175     public void handleRecover(Object message) {
176         if(persistence().isRecoveryApplicable()) {
177             if (message instanceof SnapshotOffer) {
178                 onRecoveredSnapshot((SnapshotOffer) message);
179             } else if (message instanceof ReplicatedLogEntry) {
180                 onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
181             } else if (message instanceof ApplyLogEntries) {
182                 // Handle this message for backwards compatibility with pre-Lithium versions.
183                 onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
184             } else if (message instanceof ApplyJournalEntries) {
185                 onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
186             } else if (message instanceof DeleteEntries) {
187                 replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
188             } else if (message instanceof UpdateElectionTerm) {
189                 context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
190                         ((UpdateElectionTerm) message).getVotedFor());
191             } else if (message instanceof RecoveryCompleted) {
192                 onRecoveryCompletedMessage();
193             }
194         } else {
195             if (message instanceof RecoveryCompleted) {
196                 // Delete all the messages from the akka journal so that we do not end up with consistency issues
197                 // Note I am not using the dataPersistenceProvider and directly using the akka api here
198                 deleteMessages(lastSequenceNr());
199
200                 // Delete all the akka snapshots as they will not be needed
201                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
202
203                 onRecoveryComplete();
204
205                 initializeBehavior();
206             }
207         }
208     }
209
210     private void onRecoveredSnapshot(SnapshotOffer offer) {
211         if(LOG.isDebugEnabled()) {
212             LOG.debug("{}: SnapshotOffer called..", persistenceId());
213         }
214
215         initRecoveryTimer();
216
217         Snapshot snapshot = (Snapshot) offer.snapshot();
218
219         // Create a replicated log with the snapshot information
220         // The replicated log can be used later on to retrieve this snapshot
221         // when we need to install it on a peer
222         replicatedLog = new ReplicatedLogImpl(snapshot);
223
224         context.setReplicatedLog(replicatedLog);
225         context.setLastApplied(snapshot.getLastAppliedIndex());
226         context.setCommitIndex(snapshot.getLastAppliedIndex());
227
228         Stopwatch timer = Stopwatch.createStarted();
229
230         // Apply the snapshot to the actors state
231         applyRecoverySnapshot(snapshot.getState());
232
233         timer.stop();
234         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
235                 replicatedLog.size(), persistenceId(), timer.toString(),
236                 replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
237     }
238
239     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
240         if(LOG.isDebugEnabled()) {
241             LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
242         }
243
244         replicatedLog.append(logEntry);
245     }
246
247     private void onRecoveredApplyLogEntries(long toIndex) {
248         if(LOG.isDebugEnabled()) {
249             LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
250                     persistenceId(), context.getLastApplied() + 1, toIndex);
251         }
252
253         for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
254             batchRecoveredLogEntry(replicatedLog.get(i));
255         }
256
257         context.setLastApplied(toIndex);
258         context.setCommitIndex(toIndex);
259     }
260
261     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
262         initRecoveryTimer();
263
264         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
265         if(currentRecoveryBatchCount == 0) {
266             startLogRecoveryBatch(batchSize);
267         }
268
269         appendRecoveredLogEntry(logEntry.getData());
270
271         if(++currentRecoveryBatchCount >= batchSize) {
272             endCurrentLogRecoveryBatch();
273         }
274     }
275
276     private void endCurrentLogRecoveryBatch() {
277         applyCurrentLogRecoveryBatch();
278         currentRecoveryBatchCount = 0;
279     }
280
281     private void onRecoveryCompletedMessage() {
282         if(currentRecoveryBatchCount > 0) {
283             endCurrentLogRecoveryBatch();
284         }
285
286         onRecoveryComplete();
287
288         String recoveryTime = "";
289         if(recoveryTimer != null) {
290             recoveryTimer.stop();
291             recoveryTime = " in " + recoveryTimer.toString();
292             recoveryTimer = null;
293         }
294
295         LOG.info(
296             "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
297                 "Persistence Id =  " + persistenceId() +
298                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
299                 "journal-size={}",
300             replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
301             replicatedLog.getSnapshotTerm(), replicatedLog.size());
302
303         initializeBehavior();
304     }
305
306     protected void initializeBehavior(){
307         changeCurrentBehavior(new Follower(context));
308     }
309
310     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
311         reusableBehaviorStateHolder.init(currentBehavior);
312         currentBehavior = newBehavior;
313         handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
314     }
315
316     @Override public void handleCommand(Object message) {
317         if (message instanceof ApplyState){
318             ApplyState applyState = (ApplyState) message;
319
320             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
321             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
322                 LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
323                         TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
324             }
325
326             if(LOG.isDebugEnabled()) {
327                 LOG.debug("{}: Applying state for log index {} data {}",
328                     persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
329                     applyState.getReplicatedLogEntry().getData());
330             }
331
332             applyState(applyState.getClientActor(), applyState.getIdentifier(),
333                 applyState.getReplicatedLogEntry().getData());
334
335         } else if (message instanceof ApplyJournalEntries){
336             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
337             if(LOG.isDebugEnabled()) {
338                 LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
339             }
340
341             persistence().persist(applyEntries, NoopProcedure.instance());
342
343         } else if(message instanceof ApplySnapshot ) {
344             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
345
346             if(LOG.isDebugEnabled()) {
347                 LOG.debug("{}: ApplySnapshot called on Follower Actor " +
348                         "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
349                     snapshot.getLastAppliedTerm()
350                 );
351             }
352
353             applySnapshot(snapshot.getState());
354
355             //clears the followers log, sets the snapshot index to ensure adjusted-index works
356             replicatedLog = new ReplicatedLogImpl(snapshot);
357             context.setReplicatedLog(replicatedLog);
358             context.setLastApplied(snapshot.getLastAppliedIndex());
359
360         } else if (message instanceof FindLeader) {
361             getSender().tell(
362                 new FindLeaderReply(getLeaderAddress()),
363                 getSelf()
364             );
365
366         } else if (message instanceof SaveSnapshotSuccess) {
367             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
368             LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
369
370             long sequenceNumber = success.metadata().sequenceNr();
371
372             commitSnapshot(sequenceNumber);
373
374         } else if (message instanceof SaveSnapshotFailure) {
375             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
376
377             LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
378                     persistenceId(), saveSnapshotFailure.cause());
379
380             context.getSnapshotManager().rollback();
381
382         } else if (message instanceof CaptureSnapshot) {
383             LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
384
385             context.getSnapshotManager().create(createSnapshotProcedure);
386
387         } else if (message instanceof CaptureSnapshotReply) {
388             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
389         } else if(message instanceof GetOnDemandRaftState) {
390             onGetOnDemandRaftStats();
391         } else if (message.equals(COMMIT_SNAPSHOT)) {
392             commitSnapshot(-1);
393         } else {
394             reusableBehaviorStateHolder.init(currentBehavior);
395
396             currentBehavior = currentBehavior.handleMessage(getSender(), message);
397
398             handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
399         }
400     }
401
402     private void onGetOnDemandRaftStats() {
403         // Debugging message to retrieve raft stats.
404
405         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
406                 .commitIndex(context.getCommitIndex())
407                 .currentTerm(context.getTermInformation().getCurrentTerm())
408                 .inMemoryJournalDataSize(replicatedLog.dataSize())
409                 .inMemoryJournalLogSize(replicatedLog.size())
410                 .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
411                 .lastApplied(context.getLastApplied())
412                 .lastIndex(replicatedLog.lastIndex())
413                 .lastTerm(replicatedLog.lastTerm())
414                 .leader(getLeaderId())
415                 .raftState(currentBehavior.state().toString())
416                 .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
417                 .snapshotIndex(replicatedLog.getSnapshotIndex())
418                 .snapshotTerm(replicatedLog.getSnapshotTerm())
419                 .votedFor(context.getTermInformation().getVotedFor())
420                 .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses()));
421
422         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
423         if (lastLogEntry != null) {
424             builder.lastLogIndex(lastLogEntry.getIndex());
425             builder.lastLogTerm(lastLogEntry.getTerm());
426         }
427
428         if(currentBehavior instanceof AbstractLeader) {
429             AbstractLeader leader = (AbstractLeader)currentBehavior;
430             Collection<String> followerIds = leader.getFollowerIds();
431             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
432             for(String id: followerIds) {
433                 final FollowerLogInformation info = leader.getFollower(id);
434                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
435                         info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
436             }
437
438             builder.followerInfoList(followerInfoList);
439         }
440
441         sender().tell(builder.build(), self());
442
443     }
444
445     private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
446         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
447
448         if (oldBehavior != currentBehavior){
449             onStateChanged();
450         }
451
452         String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
453         String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
454
455         // it can happen that the state has not changed but the leader has changed.
456         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
457         if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
458             if(roleChangeNotifier.isPresent()) {
459                 roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
460             }
461
462             onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
463         }
464
465         if (roleChangeNotifier.isPresent() &&
466                 (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
467             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
468                     currentBehavior.state().name()), getSelf());
469         }
470     }
471
472     /**
473      * When a derived RaftActor needs to persist something it must call
474      * persistData.
475      *
476      * @param clientActor
477      * @param identifier
478      * @param data
479      */
480     protected void persistData(final ActorRef clientActor, final String identifier,
481         final Payload data) {
482
483         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
484             context.getReplicatedLog().lastIndex() + 1,
485             context.getTermInformation().getCurrentTerm(), data);
486
487         if(LOG.isDebugEnabled()) {
488             LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
489         }
490
491         final RaftActorContext raftContext = getRaftActorContext();
492
493         replicatedLog
494                 .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
495                     @Override
496                     public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
497                         if(!hasFollowers()){
498                             // Increment the Commit Index and the Last Applied values
499                             raftContext.setCommitIndex(replicatedLogEntry.getIndex());
500                             raftContext.setLastApplied(replicatedLogEntry.getIndex());
501
502                             // Apply the state immediately
503                             applyState(clientActor, identifier, data);
504
505                             // Send a ApplyJournalEntries message so that we write the fact that we applied
506                             // the state to durable storage
507                             self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
508
509                             context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
510
511                         } else if (clientActor != null) {
512                             // Send message for replication
513                             currentBehavior.handleMessage(getSelf(),
514                                     new Replicate(clientActor, identifier,
515                                             replicatedLogEntry)
516                             );
517                         }
518
519                     }
520                 });    }
521
522     protected String getId() {
523         return context.getId();
524     }
525
526     /**
527      * Derived actors can call the isLeader method to check if the current
528      * RaftActor is the Leader or not
529      *
530      * @return true it this RaftActor is a Leader false otherwise
531      */
532     protected boolean isLeader() {
533         return context.getId().equals(currentBehavior.getLeaderId());
534     }
535
536     /**
537      * Derived actor can call getLeader if they need a reference to the Leader.
538      * This would be useful for example in forwarding a request to an actor
539      * which is the leader
540      *
541      * @return A reference to the leader if known, null otherwise
542      */
543     protected ActorSelection getLeader(){
544         String leaderAddress = getLeaderAddress();
545
546         if(leaderAddress == null){
547             return null;
548         }
549
550         return context.actorSelection(leaderAddress);
551     }
552
553     /**
554      *
555      * @return the current leader's id
556      */
557     protected String getLeaderId(){
558         return currentBehavior.getLeaderId();
559     }
560
561     protected RaftState getRaftState() {
562         return currentBehavior.state();
563     }
564
565     protected ReplicatedLogEntry getLastLogEntry() {
566         return replicatedLog.last();
567     }
568
569     protected Long getCurrentTerm(){
570         return context.getTermInformation().getCurrentTerm();
571     }
572
573     protected Long getCommitIndex(){
574         return context.getCommitIndex();
575     }
576
577     protected Long getLastApplied(){
578         return context.getLastApplied();
579     }
580
581     protected RaftActorContext getRaftActorContext() {
582         return context;
583     }
584
585     protected void updateConfigParams(ConfigParams configParams) {
586         context.setConfigParams(configParams);
587     }
588
589     public final DataPersistenceProvider persistence() {
590         return delegatingPersistenceProvider.getDelegate();
591     }
592
593     public void setPersistence(DataPersistenceProvider provider) {
594         delegatingPersistenceProvider.setDelegate(provider);
595     }
596
597     protected void setPersistence(boolean persistent) {
598         if(persistent) {
599             setPersistence(new PersistentDataProvider(this));
600         } else {
601             setPersistence(new NonPersistentDataProvider() {
602                 /**
603                  * The way snapshotting works is,
604                  * <ol>
605                  * <li> RaftActor calls createSnapshot on the Shard
606                  * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
607                  * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save
608                  * the snapshot. The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the
609                  * RaftActor gets SaveSnapshot success it commits the snapshot to the in-memory journal. This
610                  * commitSnapshot is mimicking what is done in SaveSnapshotSuccess.
611                  * </ol>
612                  */
613                 @Override
614                 public void saveSnapshot(Object o) {
615                     // Make saving Snapshot successful
616                     // Committing the snapshot here would end up calling commit in the creating state which would
617                     // be a state violation. That's why now we send a message to commit the snapshot.
618                     self().tell(COMMIT_SNAPSHOT, self());
619                 }
620             });
621         }
622     }
623
624     /**
625      * setPeerAddress sets the address of a known peer at a later time.
626      * <p>
627      * This is to account for situations where a we know that a peer
628      * exists but we do not know an address up-front. This may also be used in
629      * situations where a known peer starts off in a different location and we
630      * need to change it's address
631      * <p>
632      * Note that if the peerId does not match the list of peers passed to
633      * this actor during construction an IllegalStateException will be thrown.
634      *
635      * @param peerId
636      * @param peerAddress
637      */
638     protected void setPeerAddress(String peerId, String peerAddress){
639         context.setPeerAddress(peerId, peerAddress);
640     }
641
642     protected void commitSnapshot(long sequenceNumber) {
643         context.getSnapshotManager().commit(persistence(), sequenceNumber);
644     }
645
646     /**
647      * The applyState method will be called by the RaftActor when some data
648      * needs to be applied to the actor's state
649      *
650      * @param clientActor A reference to the client who sent this message. This
651      *                    is the same reference that was passed to persistData
652      *                    by the derived actor. clientActor may be null when
653      *                    the RaftActor is behaving as a follower or during
654      *                    recovery.
655      * @param identifier  The identifier of the persisted data. This is also
656      *                    the same identifier that was passed to persistData by
657      *                    the derived actor. identifier may be null when
658      *                    the RaftActor is behaving as a follower or during
659      *                    recovery
660      * @param data        A piece of data that was persisted by the persistData call.
661      *                    This should NEVER be null.
662      */
663     protected abstract void applyState(ActorRef clientActor, String identifier,
664         Object data);
665
666     /**
667      * This method is called during recovery at the start of a batch of state entries. Derived
668      * classes should perform any initialization needed to start a batch.
669      */
670     protected abstract void startLogRecoveryBatch(int maxBatchSize);
671
672     /**
673      * This method is called during recovery to append state data to the current batch. This method
674      * is called 1 or more times after {@link #startLogRecoveryBatch}.
675      *
676      * @param data the state data
677      */
678     protected abstract void appendRecoveredLogEntry(Payload data);
679
680     /**
681      * This method is called during recovery to reconstruct the state of the actor.
682      *
683      * @param snapshotBytes A snapshot of the state of the actor
684      */
685     protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
686
687     /**
688      * This method is called during recovery at the end of a batch to apply the current batched
689      * log entries. This method is called after {@link #appendRecoveredLogEntry}.
690      */
691     protected abstract void applyCurrentLogRecoveryBatch();
692
693     /**
694      * This method is called when recovery is complete.
695      */
696     protected abstract void onRecoveryComplete();
697
698     /**
699      * This method will be called by the RaftActor when a snapshot needs to be
700      * created. The derived actor should respond with its current state.
701      * <p/>
702      * During recovery the state that is returned by the derived actor will
703      * be passed back to it by calling the applySnapshot  method
704      *
705      * @return The current state of the actor
706      */
707     protected abstract void createSnapshot();
708
709     /**
710      * This method can be called at any other point during normal
711      * operations when the derived actor is out of sync with it's peers
712      * and the only way to bring it in sync is by applying a snapshot
713      *
714      * @param snapshotBytes A snapshot of the state of the actor
715      */
716     protected abstract void applySnapshot(byte[] snapshotBytes);
717
718     /**
719      * This method will be called by the RaftActor when the state of the
720      * RaftActor changes. The derived actor can then use methods like
721      * isLeader or getLeader to do something useful
722      */
723     protected abstract void onStateChanged();
724
725     /**
726      * Notifier Actor for this RaftActor to notify when a role change happens
727      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
728      */
729     protected abstract Optional<ActorRef> getRoleChangeNotifier();
730
731     protected void onLeaderChanged(String oldLeader, String newLeader){};
732
733     private String getLeaderAddress(){
734         if(isLeader()){
735             return getSelf().path().toString();
736         }
737         String leaderId = currentBehavior.getLeaderId();
738         if (leaderId == null) {
739             return null;
740         }
741         String peerAddress = context.getPeerAddress(leaderId);
742         if(LOG.isDebugEnabled()) {
743             LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
744                     persistenceId(), leaderId, peerAddress);
745         }
746
747         return peerAddress;
748     }
749
750     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
751         LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
752
753         context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
754     }
755
756     protected long getTotalMemory() {
757         return Runtime.getRuntime().totalMemory();
758     }
759
760     protected boolean hasFollowers(){
761         return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
762     }
763
764     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
765         private static final int DATA_SIZE_DIVIDER = 5;
766         private long dataSizeSinceLastSnapshot = 0L;
767
768
769         public ReplicatedLogImpl(Snapshot snapshot) {
770             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
771                 snapshot.getUnAppliedEntries());
772         }
773
774         public ReplicatedLogImpl() {
775             super();
776         }
777
778         @Override public void removeFromAndPersist(long logEntryIndex) {
779             int adjustedIndex = adjustedIndex(logEntryIndex);
780
781             if (adjustedIndex < 0) {
782                 return;
783             }
784
785             // FIXME: Maybe this should be done after the command is saved
786             journal.subList(adjustedIndex , journal.size()).clear();
787
788             persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
789
790                 @Override
791                 public void apply(DeleteEntries param)
792                         throws Exception {
793                     //FIXME : Doing nothing for now
794                     dataSize = 0;
795                     for (ReplicatedLogEntry entry : journal) {
796                         dataSize += entry.size();
797                     }
798                 }
799             });
800         }
801
802         @Override public void appendAndPersist(
803             final ReplicatedLogEntry replicatedLogEntry) {
804             appendAndPersist(replicatedLogEntry, null);
805         }
806
807         public void appendAndPersist(
808             final ReplicatedLogEntry replicatedLogEntry,
809             final Procedure<ReplicatedLogEntry> callback)  {
810
811             if(LOG.isDebugEnabled()) {
812                 LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
813             }
814
815             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
816             journal.add(replicatedLogEntry);
817
818             // When persisting events with persist it is guaranteed that the
819             // persistent actor will not receive further commands between the
820             // persist call and the execution(s) of the associated event
821             // handler. This also holds for multiple persist calls in context
822             // of a single command.
823             persistence().persist(replicatedLogEntry,
824                 new Procedure<ReplicatedLogEntry>() {
825                     @Override
826                     public void apply(ReplicatedLogEntry evt) throws Exception {
827                         int logEntrySize = replicatedLogEntry.size();
828
829                         dataSize += logEntrySize;
830                         long dataSizeForCheck = dataSize;
831
832                         dataSizeSinceLastSnapshot += logEntrySize;
833
834                         if (!hasFollowers()) {
835                             // When we do not have followers we do not maintain an in-memory log
836                             // due to this the journalSize will never become anything close to the
837                             // snapshot batch count. In fact will mostly be 1.
838                             // Similarly since the journal's dataSize depends on the entries in the
839                             // journal the journal's dataSize will never reach a value close to the
840                             // memory threshold.
841                             // By maintaining the dataSize outside the journal we are tracking essentially
842                             // what we have written to the disk however since we no longer are in
843                             // need of doing a snapshot just for the sake of freeing up memory we adjust
844                             // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
845                             // as if we were maintaining a real snapshot
846                             dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
847                         }
848                         long journalSize = replicatedLogEntry.getIndex() + 1;
849                         long dataThreshold = getTotalMemory() *
850                                 context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
851
852                         if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
853                                 || dataSizeForCheck > dataThreshold)) {
854
855                             boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
856                                     currentBehavior.getReplicatedToAllIndex());
857
858                             if(started){
859                                 dataSizeSinceLastSnapshot = 0;
860                             }
861
862                         }
863
864                         if (callback != null){
865                             callback.apply(replicatedLogEntry);
866                         }
867                     }
868                 }
869             );
870         }
871
872     }
873
874     static class DeleteEntries implements Serializable {
875         private static final long serialVersionUID = 1L;
876         private final int fromIndex;
877
878         public DeleteEntries(int fromIndex) {
879             this.fromIndex = fromIndex;
880         }
881
882         public int getFromIndex() {
883             return fromIndex;
884         }
885     }
886
887     static class UpdateElectionTerm implements Serializable {
888         private static final long serialVersionUID = 1L;
889         private final long currentTerm;
890         private final String votedFor;
891
892         public UpdateElectionTerm(long currentTerm, String votedFor) {
893             this.currentTerm = currentTerm;
894             this.votedFor = votedFor;
895         }
896
897         public long getCurrentTerm() {
898             return currentTerm;
899         }
900
901         public String getVotedFor() {
902             return votedFor;
903         }
904     }
905
906     private class CreateSnapshotProcedure implements Procedure<Void> {
907
908         @Override
909         public void apply(Void aVoid) throws Exception {
910             createSnapshot();
911         }
912     }
913
914     @VisibleForTesting
915     void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
916         currentBehavior = behavior;
917     }
918
919     protected RaftActorBehavior getCurrentBehavior() {
920         return currentBehavior;
921     }
922
923     private static class BehaviorStateHolder {
924         private RaftActorBehavior behavior;
925         private String leaderId;
926
927         void init(RaftActorBehavior behavior) {
928             this.behavior = behavior;
929             this.leaderId = behavior != null ? behavior.getLeaderId() : null;
930         }
931
932         RaftActorBehavior getBehavior() {
933             return behavior;
934         }
935
936         String getLeaderId() {
937             return leaderId;
938         }
939     }
940 }