Remove RaftActor#trimPersistentData method
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.japi.Procedure;
14 import akka.persistence.RecoveryCompleted;
15 import akka.persistence.SaveSnapshotFailure;
16 import akka.persistence.SaveSnapshotSuccess;
17 import akka.persistence.SnapshotOffer;
18 import akka.persistence.SnapshotSelectionCriteria;
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Objects;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Stopwatch;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.collect.Lists;
25 import java.io.Serializable;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang3.time.DurationFormatUtils;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
33 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
34 import org.opendaylight.controller.cluster.notifications.RoleChanged;
35 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
36 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
39 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
41 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
42 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
43 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
44 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
45 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
46 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
47 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
48 import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
49 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
50 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
51 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * RaftActor encapsulates a state machine that needs to be kept synchronized
57  * in a cluster. It implements the RAFT algorithm as described in the paper
58  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
59  * In Search of an Understandable Consensus Algorithm</a>
60  * <p/>
61  * RaftActor has 3 states and each state has a certain behavior associated
62  * with it. A Raft actor can behave as,
63  * <ul>
64  * <li> A Leader </li>
65  * <li> A Follower (or) </li>
66  * <li> A Candidate </li>
67  * </ul>
68  * <p/>
69  * <p/>
70  * A RaftActor MUST be a Leader in order to accept requests from clients to
71  * change the state of it's encapsulated state machine. Once a RaftActor becomes
72  * a Leader it is also responsible for ensuring that all followers ultimately
73  * have the same log and therefore the same state machine as itself.
74  * <p/>
75  * <p/>
76  * The current behavior of a RaftActor determines how election for leadership
77  * is initiated and how peer RaftActors react to request for votes.
78  * <p/>
79  * <p/>
80  * Each RaftActor also needs to know the current election term. It uses this
81  * information for a couple of things. One is to simply figure out who it
82  * voted for in the last election. Another is to figure out if the message
83  * it received to update it's state is stale.
84  * <p/>
85  * <p/>
86  * The RaftActor uses akka-persistence to store it's replicated log.
87  * Furthermore through it's behaviors a Raft Actor determines
88  * <p/>
89  * <ul>
90  * <li> when a log entry should be persisted </li>
91  * <li> when a log entry should be applied to the state machine (and) </li>
92  * <li> when a snapshot should be saved </li>
93  * </ul>
94  */
95 public abstract class RaftActor extends AbstractUntypedPersistentActor {
96
97     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
98
99     private static final Procedure<ApplyJournalEntries> APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK =
100             new Procedure<ApplyJournalEntries>() {
101                 @Override
102                 public void apply(ApplyJournalEntries param) throws Exception {
103                 }
104             };
105     private static final String COMMIT_SNAPSHOT = "commit_snapshot";
106
107     protected final Logger LOG = LoggerFactory.getLogger(getClass());
108
109     /**
110      * The current state determines the current behavior of a RaftActor
111      * A Raft Actor always starts off in the Follower State
112      */
113     private RaftActorBehavior currentBehavior;
114
115     /**
116      * This context should NOT be passed directly to any other actor it is
117      * only to be consumed by the RaftActorBehaviors
118      */
119     private final RaftActorContextImpl context;
120
121     private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
122
123     /**
124      * The in-memory journal
125      */
126     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
127
128     private Stopwatch recoveryTimer;
129
130     private int currentRecoveryBatchCount;
131
132     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
133
134     public RaftActor(String id, Map<String, String> peerAddresses) {
135         this(id, peerAddresses, Optional.<ConfigParams>absent());
136     }
137
138     public RaftActor(String id, Map<String, String> peerAddresses,
139          Optional<ConfigParams> configParams) {
140
141         context = new RaftActorContextImpl(this.getSelf(),
142             this.getContext(), id, new ElectionTermImpl(),
143             -1, -1, replicatedLog, peerAddresses,
144             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
145             LOG);
146     }
147
148     private void initRecoveryTimer() {
149         if(recoveryTimer == null) {
150             recoveryTimer = Stopwatch.createStarted();
151         }
152     }
153
154     @Override
155     public void preStart() throws Exception {
156         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
157                 context.getConfigParams().getJournalRecoveryLogBatchSize());
158
159         super.preStart();
160     }
161
162     @Override
163     public void postStop() {
164         if(currentBehavior != null) {
165             try {
166                 currentBehavior.close();
167             } catch (Exception e) {
168                 LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
169             }
170         }
171
172         super.postStop();
173     }
174
175     @Override
176     public void handleRecover(Object message) {
177         if(persistence().isRecoveryApplicable()) {
178             if (message instanceof SnapshotOffer) {
179                 onRecoveredSnapshot((SnapshotOffer) message);
180             } else if (message instanceof ReplicatedLogEntry) {
181                 onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
182             } else if (message instanceof ApplyLogEntries) {
183                 // Handle this message for backwards compatibility with pre-Lithium versions.
184                 onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
185             } else if (message instanceof ApplyJournalEntries) {
186                 onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
187             } else if (message instanceof DeleteEntries) {
188                 replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
189             } else if (message instanceof UpdateElectionTerm) {
190                 context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
191                         ((UpdateElectionTerm) message).getVotedFor());
192             } else if (message instanceof RecoveryCompleted) {
193                 onRecoveryCompletedMessage();
194             }
195         } else {
196             if (message instanceof RecoveryCompleted) {
197                 // Delete all the messages from the akka journal so that we do not end up with consistency issues
198                 // Note I am not using the dataPersistenceProvider and directly using the akka api here
199                 deleteMessages(lastSequenceNr());
200
201                 // Delete all the akka snapshots as they will not be needed
202                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
203
204                 onRecoveryComplete();
205
206                 initializeBehavior();
207             }
208         }
209     }
210
211     private void onRecoveredSnapshot(SnapshotOffer offer) {
212         if(LOG.isDebugEnabled()) {
213             LOG.debug("{}: SnapshotOffer called..", persistenceId());
214         }
215
216         initRecoveryTimer();
217
218         Snapshot snapshot = (Snapshot) offer.snapshot();
219
220         // Create a replicated log with the snapshot information
221         // The replicated log can be used later on to retrieve this snapshot
222         // when we need to install it on a peer
223         replicatedLog = new ReplicatedLogImpl(snapshot);
224
225         context.setReplicatedLog(replicatedLog);
226         context.setLastApplied(snapshot.getLastAppliedIndex());
227         context.setCommitIndex(snapshot.getLastAppliedIndex());
228
229         Stopwatch timer = Stopwatch.createStarted();
230
231         // Apply the snapshot to the actors state
232         applyRecoverySnapshot(snapshot.getState());
233
234         timer.stop();
235         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
236                 replicatedLog.size(), persistenceId(), timer.toString(),
237                 replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
238     }
239
240     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
241         if(LOG.isDebugEnabled()) {
242             LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
243         }
244
245         replicatedLog.append(logEntry);
246     }
247
248     private void onRecoveredApplyLogEntries(long toIndex) {
249         if(LOG.isDebugEnabled()) {
250             LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
251                     persistenceId(), context.getLastApplied() + 1, toIndex);
252         }
253
254         for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
255             batchRecoveredLogEntry(replicatedLog.get(i));
256         }
257
258         context.setLastApplied(toIndex);
259         context.setCommitIndex(toIndex);
260     }
261
262     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
263         initRecoveryTimer();
264
265         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
266         if(currentRecoveryBatchCount == 0) {
267             startLogRecoveryBatch(batchSize);
268         }
269
270         appendRecoveredLogEntry(logEntry.getData());
271
272         if(++currentRecoveryBatchCount >= batchSize) {
273             endCurrentLogRecoveryBatch();
274         }
275     }
276
277     private void endCurrentLogRecoveryBatch() {
278         applyCurrentLogRecoveryBatch();
279         currentRecoveryBatchCount = 0;
280     }
281
282     private void onRecoveryCompletedMessage() {
283         if(currentRecoveryBatchCount > 0) {
284             endCurrentLogRecoveryBatch();
285         }
286
287         onRecoveryComplete();
288
289         String recoveryTime = "";
290         if(recoveryTimer != null) {
291             recoveryTimer.stop();
292             recoveryTime = " in " + recoveryTimer.toString();
293             recoveryTimer = null;
294         }
295
296         LOG.info(
297             "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
298                 "Persistence Id =  " + persistenceId() +
299                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
300                 "journal-size={}",
301             replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
302             replicatedLog.getSnapshotTerm(), replicatedLog.size());
303
304         initializeBehavior();
305     }
306
307     protected void initializeBehavior(){
308         changeCurrentBehavior(new Follower(context));
309     }
310
311     protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
312         reusableBehaviorStateHolder.init(currentBehavior);
313         currentBehavior = newBehavior;
314         handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
315     }
316
317     @Override public void handleCommand(Object message) {
318         if (message instanceof ApplyState){
319             ApplyState applyState = (ApplyState) message;
320
321             long elapsedTime = (System.nanoTime() - applyState.getStartTime());
322             if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
323                 LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
324                         TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
325             }
326
327             if(LOG.isDebugEnabled()) {
328                 LOG.debug("{}: Applying state for log index {} data {}",
329                     persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
330                     applyState.getReplicatedLogEntry().getData());
331             }
332
333             applyState(applyState.getClientActor(), applyState.getIdentifier(),
334                 applyState.getReplicatedLogEntry().getData());
335
336         } else if (message instanceof ApplyJournalEntries){
337             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
338             if(LOG.isDebugEnabled()) {
339                 LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
340             }
341
342             persistence().persist(applyEntries, APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK);
343
344         } else if(message instanceof ApplySnapshot ) {
345             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
346
347             if(LOG.isDebugEnabled()) {
348                 LOG.debug("{}: ApplySnapshot called on Follower Actor " +
349                         "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
350                     snapshot.getLastAppliedTerm()
351                 );
352             }
353
354             applySnapshot(snapshot.getState());
355
356             //clears the followers log, sets the snapshot index to ensure adjusted-index works
357             replicatedLog = new ReplicatedLogImpl(snapshot);
358             context.setReplicatedLog(replicatedLog);
359             context.setLastApplied(snapshot.getLastAppliedIndex());
360
361         } else if (message instanceof FindLeader) {
362             getSender().tell(
363                 new FindLeaderReply(getLeaderAddress()),
364                 getSelf()
365             );
366
367         } else if (message instanceof SaveSnapshotSuccess) {
368             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
369             LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
370
371             long sequenceNumber = success.metadata().sequenceNr();
372
373             commitSnapshot(sequenceNumber);
374
375         } else if (message instanceof SaveSnapshotFailure) {
376             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
377
378             LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
379                     persistenceId(), saveSnapshotFailure.cause());
380
381             context.getSnapshotManager().rollback();
382
383         } else if (message instanceof CaptureSnapshot) {
384             LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
385
386             context.getSnapshotManager().create(createSnapshotProcedure);
387
388         } else if (message instanceof CaptureSnapshotReply) {
389             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
390         } else if(message instanceof GetOnDemandRaftState) {
391             onGetOnDemandRaftStats();
392         } else if (message.equals(COMMIT_SNAPSHOT)) {
393             commitSnapshot(-1);
394         } else {
395             reusableBehaviorStateHolder.init(currentBehavior);
396
397             currentBehavior = currentBehavior.handleMessage(getSender(), message);
398
399             handleBehaviorChange(reusableBehaviorStateHolder, currentBehavior);
400         }
401     }
402
403     private void onGetOnDemandRaftStats() {
404         // Debugging message to retrieve raft stats.
405
406         OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
407                 .commitIndex(context.getCommitIndex())
408                 .currentTerm(context.getTermInformation().getCurrentTerm())
409                 .inMemoryJournalDataSize(replicatedLog.dataSize())
410                 .inMemoryJournalLogSize(replicatedLog.size())
411                 .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
412                 .lastApplied(context.getLastApplied())
413                 .lastIndex(replicatedLog.lastIndex())
414                 .lastTerm(replicatedLog.lastTerm())
415                 .leader(getLeaderId())
416                 .raftState(currentBehavior.state().toString())
417                 .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
418                 .snapshotIndex(replicatedLog.getSnapshotIndex())
419                 .snapshotTerm(replicatedLog.getSnapshotTerm())
420                 .votedFor(context.getTermInformation().getVotedFor())
421                 .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses()));
422
423         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
424         if (lastLogEntry != null) {
425             builder.lastLogIndex(lastLogEntry.getIndex());
426             builder.lastLogTerm(lastLogEntry.getTerm());
427         }
428
429         if(currentBehavior instanceof AbstractLeader) {
430             AbstractLeader leader = (AbstractLeader)currentBehavior;
431             Collection<String> followerIds = leader.getFollowerIds();
432             List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
433             for(String id: followerIds) {
434                 final FollowerLogInformation info = leader.getFollower(id);
435                 followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
436                         info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
437             }
438
439             builder.followerInfoList(followerInfoList);
440         }
441
442         sender().tell(builder.build(), self());
443
444     }
445
446     private void handleBehaviorChange(BehaviorStateHolder oldBehaviorState, RaftActorBehavior currentBehavior) {
447         RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior();
448
449         if (oldBehavior != currentBehavior){
450             onStateChanged();
451         }
452
453         String oldBehaviorLeaderId = oldBehavior == null ? null : oldBehaviorState.getLeaderId();
454         String oldBehaviorStateName = oldBehavior == null ? null : oldBehavior.state().name();
455
456         // it can happen that the state has not changed but the leader has changed.
457         Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
458         if(!Objects.equal(oldBehaviorLeaderId, currentBehavior.getLeaderId())) {
459             if(roleChangeNotifier.isPresent()) {
460                 roleChangeNotifier.get().tell(new LeaderStateChanged(getId(), currentBehavior.getLeaderId()), getSelf());
461             }
462
463             onLeaderChanged(oldBehaviorLeaderId, currentBehavior.getLeaderId());
464         }
465
466         if (roleChangeNotifier.isPresent() &&
467                 (oldBehavior == null || (oldBehavior.state() != currentBehavior.state()))) {
468             roleChangeNotifier.get().tell(new RoleChanged(getId(), oldBehaviorStateName ,
469                     currentBehavior.state().name()), getSelf());
470         }
471     }
472
473     /**
474      * When a derived RaftActor needs to persist something it must call
475      * persistData.
476      *
477      * @param clientActor
478      * @param identifier
479      * @param data
480      */
481     protected void persistData(final ActorRef clientActor, final String identifier,
482         final Payload data) {
483
484         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
485             context.getReplicatedLog().lastIndex() + 1,
486             context.getTermInformation().getCurrentTerm(), data);
487
488         if(LOG.isDebugEnabled()) {
489             LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
490         }
491
492         final RaftActorContext raftContext = getRaftActorContext();
493
494         replicatedLog
495                 .appendAndPersist(replicatedLogEntry, new Procedure<ReplicatedLogEntry>() {
496                     @Override
497                     public void apply(ReplicatedLogEntry replicatedLogEntry) throws Exception {
498                         if(!hasFollowers()){
499                             // Increment the Commit Index and the Last Applied values
500                             raftContext.setCommitIndex(replicatedLogEntry.getIndex());
501                             raftContext.setLastApplied(replicatedLogEntry.getIndex());
502
503                             // Apply the state immediately
504                             applyState(clientActor, identifier, data);
505
506                             // Send a ApplyJournalEntries message so that we write the fact that we applied
507                             // the state to durable storage
508                             self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
509
510                             context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
511
512                         } else if (clientActor != null) {
513                             // Send message for replication
514                             currentBehavior.handleMessage(getSelf(),
515                                     new Replicate(clientActor, identifier,
516                                             replicatedLogEntry)
517                             );
518                         }
519
520                     }
521                 });    }
522
523     protected String getId() {
524         return context.getId();
525     }
526
527     /**
528      * Derived actors can call the isLeader method to check if the current
529      * RaftActor is the Leader or not
530      *
531      * @return true it this RaftActor is a Leader false otherwise
532      */
533     protected boolean isLeader() {
534         return context.getId().equals(currentBehavior.getLeaderId());
535     }
536
537     /**
538      * Derived actor can call getLeader if they need a reference to the Leader.
539      * This would be useful for example in forwarding a request to an actor
540      * which is the leader
541      *
542      * @return A reference to the leader if known, null otherwise
543      */
544     protected ActorSelection getLeader(){
545         String leaderAddress = getLeaderAddress();
546
547         if(leaderAddress == null){
548             return null;
549         }
550
551         return context.actorSelection(leaderAddress);
552     }
553
554     /**
555      *
556      * @return the current leader's id
557      */
558     protected String getLeaderId(){
559         return currentBehavior.getLeaderId();
560     }
561
562     protected RaftState getRaftState() {
563         return currentBehavior.state();
564     }
565
566     protected ReplicatedLogEntry getLastLogEntry() {
567         return replicatedLog.last();
568     }
569
570     protected Long getCurrentTerm(){
571         return context.getTermInformation().getCurrentTerm();
572     }
573
574     protected Long getCommitIndex(){
575         return context.getCommitIndex();
576     }
577
578     protected Long getLastApplied(){
579         return context.getLastApplied();
580     }
581
582     protected RaftActorContext getRaftActorContext() {
583         return context;
584     }
585
586     protected void updateConfigParams(ConfigParams configParams) {
587         context.setConfigParams(configParams);
588     }
589
590     /**
591      * setPeerAddress sets the address of a known peer at a later time.
592      * <p>
593      * This is to account for situations where a we know that a peer
594      * exists but we do not know an address up-front. This may also be used in
595      * situations where a known peer starts off in a different location and we
596      * need to change it's address
597      * <p>
598      * Note that if the peerId does not match the list of peers passed to
599      * this actor during construction an IllegalStateException will be thrown.
600      *
601      * @param peerId
602      * @param peerAddress
603      */
604     protected void setPeerAddress(String peerId, String peerAddress){
605         context.setPeerAddress(peerId, peerAddress);
606     }
607
608     protected void commitSnapshot(long sequenceNumber) {
609         context.getSnapshotManager().commit(persistence(), sequenceNumber);
610     }
611
612     /**
613      * The applyState method will be called by the RaftActor when some data
614      * needs to be applied to the actor's state
615      *
616      * @param clientActor A reference to the client who sent this message. This
617      *                    is the same reference that was passed to persistData
618      *                    by the derived actor. clientActor may be null when
619      *                    the RaftActor is behaving as a follower or during
620      *                    recovery.
621      * @param identifier  The identifier of the persisted data. This is also
622      *                    the same identifier that was passed to persistData by
623      *                    the derived actor. identifier may be null when
624      *                    the RaftActor is behaving as a follower or during
625      *                    recovery
626      * @param data        A piece of data that was persisted by the persistData call.
627      *                    This should NEVER be null.
628      */
629     protected abstract void applyState(ActorRef clientActor, String identifier,
630         Object data);
631
632     /**
633      * This method is called during recovery at the start of a batch of state entries. Derived
634      * classes should perform any initialization needed to start a batch.
635      */
636     protected abstract void startLogRecoveryBatch(int maxBatchSize);
637
638     /**
639      * This method is called during recovery to append state data to the current batch. This method
640      * is called 1 or more times after {@link #startLogRecoveryBatch}.
641      *
642      * @param data the state data
643      */
644     protected abstract void appendRecoveredLogEntry(Payload data);
645
646     /**
647      * This method is called during recovery to reconstruct the state of the actor.
648      *
649      * @param snapshotBytes A snapshot of the state of the actor
650      */
651     protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
652
653     /**
654      * This method is called during recovery at the end of a batch to apply the current batched
655      * log entries. This method is called after {@link #appendRecoveredLogEntry}.
656      */
657     protected abstract void applyCurrentLogRecoveryBatch();
658
659     /**
660      * This method is called when recovery is complete.
661      */
662     protected abstract void onRecoveryComplete();
663
664     /**
665      * This method will be called by the RaftActor when a snapshot needs to be
666      * created. The derived actor should respond with its current state.
667      * <p/>
668      * During recovery the state that is returned by the derived actor will
669      * be passed back to it by calling the applySnapshot  method
670      *
671      * @return The current state of the actor
672      */
673     protected abstract void createSnapshot();
674
675     /**
676      * This method can be called at any other point during normal
677      * operations when the derived actor is out of sync with it's peers
678      * and the only way to bring it in sync is by applying a snapshot
679      *
680      * @param snapshotBytes A snapshot of the state of the actor
681      */
682     protected abstract void applySnapshot(byte[] snapshotBytes);
683
684     /**
685      * This method will be called by the RaftActor when the state of the
686      * RaftActor changes. The derived actor can then use methods like
687      * isLeader or getLeader to do something useful
688      */
689     protected abstract void onStateChanged();
690
691     protected abstract DataPersistenceProvider persistence();
692
693     /**
694      * Notifier Actor for this RaftActor to notify when a role change happens
695      * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
696      */
697     protected abstract Optional<ActorRef> getRoleChangeNotifier();
698
699     protected void onLeaderChanged(String oldLeader, String newLeader){};
700
701     private String getLeaderAddress(){
702         if(isLeader()){
703             return getSelf().path().toString();
704         }
705         String leaderId = currentBehavior.getLeaderId();
706         if (leaderId == null) {
707             return null;
708         }
709         String peerAddress = context.getPeerAddress(leaderId);
710         if(LOG.isDebugEnabled()) {
711             LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
712                     persistenceId(), leaderId, peerAddress);
713         }
714
715         return peerAddress;
716     }
717
718     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
719         LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
720
721         context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
722     }
723
724     protected long getTotalMemory() {
725         return Runtime.getRuntime().totalMemory();
726     }
727
728     protected boolean hasFollowers(){
729         return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
730     }
731
732     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
733         private static final int DATA_SIZE_DIVIDER = 5;
734         private long dataSizeSinceLastSnapshot = 0L;
735
736
737         public ReplicatedLogImpl(Snapshot snapshot) {
738             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
739                 snapshot.getUnAppliedEntries());
740         }
741
742         public ReplicatedLogImpl() {
743             super();
744         }
745
746         @Override public void removeFromAndPersist(long logEntryIndex) {
747             int adjustedIndex = adjustedIndex(logEntryIndex);
748
749             if (adjustedIndex < 0) {
750                 return;
751             }
752
753             // FIXME: Maybe this should be done after the command is saved
754             journal.subList(adjustedIndex , journal.size()).clear();
755
756             persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
757
758                 @Override
759                 public void apply(DeleteEntries param)
760                         throws Exception {
761                     //FIXME : Doing nothing for now
762                     dataSize = 0;
763                     for (ReplicatedLogEntry entry : journal) {
764                         dataSize += entry.size();
765                     }
766                 }
767             });
768         }
769
770         @Override public void appendAndPersist(
771             final ReplicatedLogEntry replicatedLogEntry) {
772             appendAndPersist(replicatedLogEntry, null);
773         }
774
775         public void appendAndPersist(
776             final ReplicatedLogEntry replicatedLogEntry,
777             final Procedure<ReplicatedLogEntry> callback)  {
778
779             if(LOG.isDebugEnabled()) {
780                 LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
781             }
782
783             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
784             journal.add(replicatedLogEntry);
785
786             // When persisting events with persist it is guaranteed that the
787             // persistent actor will not receive further commands between the
788             // persist call and the execution(s) of the associated event
789             // handler. This also holds for multiple persist calls in context
790             // of a single command.
791             persistence().persist(replicatedLogEntry,
792                 new Procedure<ReplicatedLogEntry>() {
793                     @Override
794                     public void apply(ReplicatedLogEntry evt) throws Exception {
795                         int logEntrySize = replicatedLogEntry.size();
796
797                         dataSize += logEntrySize;
798                         long dataSizeForCheck = dataSize;
799
800                         dataSizeSinceLastSnapshot += logEntrySize;
801
802                         if (!hasFollowers()) {
803                             // When we do not have followers we do not maintain an in-memory log
804                             // due to this the journalSize will never become anything close to the
805                             // snapshot batch count. In fact will mostly be 1.
806                             // Similarly since the journal's dataSize depends on the entries in the
807                             // journal the journal's dataSize will never reach a value close to the
808                             // memory threshold.
809                             // By maintaining the dataSize outside the journal we are tracking essentially
810                             // what we have written to the disk however since we no longer are in
811                             // need of doing a snapshot just for the sake of freeing up memory we adjust
812                             // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
813                             // as if we were maintaining a real snapshot
814                             dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
815                         }
816                         long journalSize = replicatedLogEntry.getIndex() + 1;
817                         long dataThreshold = getTotalMemory() *
818                                 context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
819
820                         if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
821                                 || dataSizeForCheck > dataThreshold)) {
822
823                             boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
824                                     currentBehavior.getReplicatedToAllIndex());
825
826                             if(started){
827                                 dataSizeSinceLastSnapshot = 0;
828                             }
829
830                         }
831
832                         if (callback != null){
833                             callback.apply(replicatedLogEntry);
834                         }
835                     }
836                 }
837             );
838         }
839
840     }
841
842     static class DeleteEntries implements Serializable {
843         private static final long serialVersionUID = 1L;
844         private final int fromIndex;
845
846         public DeleteEntries(int fromIndex) {
847             this.fromIndex = fromIndex;
848         }
849
850         public int getFromIndex() {
851             return fromIndex;
852         }
853     }
854
855
856     private class ElectionTermImpl implements ElectionTerm {
857         /**
858          * Identifier of the actor whose election term information this is
859          */
860         private long currentTerm = 0;
861         private String votedFor = null;
862
863         @Override
864         public long getCurrentTerm() {
865             return currentTerm;
866         }
867
868         @Override
869         public String getVotedFor() {
870             return votedFor;
871         }
872
873         @Override public void update(long currentTerm, String votedFor) {
874             if(LOG.isDebugEnabled()) {
875                 LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
876             }
877             this.currentTerm = currentTerm;
878             this.votedFor = votedFor;
879         }
880
881         @Override
882         public void updateAndPersist(long currentTerm, String votedFor){
883             update(currentTerm, votedFor);
884             // FIXME : Maybe first persist then update the state
885             persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
886
887                 @Override public void apply(UpdateElectionTerm param)
888                     throws Exception {
889
890                 }
891             });
892         }
893     }
894
895     static class UpdateElectionTerm implements Serializable {
896         private static final long serialVersionUID = 1L;
897         private final long currentTerm;
898         private final String votedFor;
899
900         public UpdateElectionTerm(long currentTerm, String votedFor) {
901             this.currentTerm = currentTerm;
902             this.votedFor = votedFor;
903         }
904
905         public long getCurrentTerm() {
906             return currentTerm;
907         }
908
909         public String getVotedFor() {
910             return votedFor;
911         }
912     }
913
914     protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider {
915
916         public NonPersistentRaftDataProvider(){
917
918         }
919
920         /**
921          * The way snapshotting works is,
922          * <ol>
923          * <li> RaftActor calls createSnapshot on the Shard
924          * <li> Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot
925          * <li> When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot.
926          * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot
927          * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done
928          * in SaveSnapshotSuccess.
929          * </ol>
930          * @param o
931          */
932         @Override
933         public void saveSnapshot(Object o) {
934             // Make saving Snapshot successful
935             // Committing the snapshot here would end up calling commit in the creating state which would
936             // be a state violation. That's why now we send a message to commit the snapshot.
937             self().tell(COMMIT_SNAPSHOT, self());
938         }
939     }
940
941
942     private class CreateSnapshotProcedure implements Procedure<Void> {
943
944         @Override
945         public void apply(Void aVoid) throws Exception {
946             createSnapshot();
947         }
948     }
949
950     @VisibleForTesting
951     void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
952         currentBehavior = behavior;
953     }
954
955     protected RaftActorBehavior getCurrentBehavior() {
956         return currentBehavior;
957     }
958
959     private static class BehaviorStateHolder {
960         private RaftActorBehavior behavior;
961         private String leaderId;
962
963         void init(RaftActorBehavior behavior) {
964             this.behavior = behavior;
965             this.leaderId = behavior != null ? behavior.getLeaderId() : null;
966         }
967
968         RaftActorBehavior getBehavior() {
969             return behavior;
970         }
971
972         String getLeaderId() {
973             return leaderId;
974         }
975     }
976 }