64fa7496042466e58bd51cf0a488c265898866da
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.SaveSnapshotFailure;
18 import akka.persistence.SaveSnapshotSuccess;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.persistence.UntypedPersistentActor;
22 import com.google.common.base.Optional;
23 import com.google.common.base.Stopwatch;
24 import com.google.protobuf.ByteString;
25 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
27 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
28 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
29 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
30 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
31 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
32 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
33 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
34 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
35 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
36 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
37 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
38 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
39 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
41 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
42 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
43 import java.io.Serializable;
44 import java.util.Map;
45
46 /**
47  * RaftActor encapsulates a state machine that needs to be kept synchronized
48  * in a cluster. It implements the RAFT algorithm as described in the paper
49  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
50  * In Search of an Understandable Consensus Algorithm</a>
51  * <p/>
52  * RaftActor has 3 states and each state has a certain behavior associated
53  * with it. A Raft actor can behave as,
54  * <ul>
55  * <li> A Leader </li>
56  * <li> A Follower (or) </li>
57  * <li> A Candidate </li>
58  * </ul>
59  * <p/>
60  * <p/>
61  * A RaftActor MUST be a Leader in order to accept requests from clients to
62  * change the state of it's encapsulated state machine. Once a RaftActor becomes
63  * a Leader it is also responsible for ensuring that all followers ultimately
64  * have the same log and therefore the same state machine as itself.
65  * <p/>
66  * <p/>
67  * The current behavior of a RaftActor determines how election for leadership
68  * is initiated and how peer RaftActors react to request for votes.
69  * <p/>
70  * <p/>
71  * Each RaftActor also needs to know the current election term. It uses this
72  * information for a couple of things. One is to simply figure out who it
73  * voted for in the last election. Another is to figure out if the message
74  * it received to update it's state is stale.
75  * <p/>
76  * <p/>
77  * The RaftActor uses akka-persistence to store it's replicated log.
78  * Furthermore through it's behaviors a Raft Actor determines
79  * <p/>
80  * <ul>
81  * <li> when a log entry should be persisted </li>
82  * <li> when a log entry should be applied to the state machine (and) </li>
83  * <li> when a snapshot should be saved </li>
84  * </ul>
85  */
86 public abstract class RaftActor extends UntypedPersistentActor {
87     protected final LoggingAdapter LOG =
88         Logging.getLogger(getContext().system(), this);
89
90     /**
91      * The current state determines the current behavior of a RaftActor
92      * A Raft Actor always starts off in the Follower State
93      */
94     private RaftActorBehavior currentBehavior;
95
96     /**
97      * This context should NOT be passed directly to any other actor it is
98      * only to be consumed by the RaftActorBehaviors
99      */
100     private final RaftActorContext context;
101
102     /**
103      * The in-memory journal
104      */
105     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
106
107     private CaptureSnapshot captureSnapshot = null;
108
109     private volatile boolean hasSnapshotCaptureInitiated = false;
110
111     private Stopwatch recoveryTimer;
112
113     private int currentRecoveryBatchCount;
114
115     public RaftActor(String id, Map<String, String> peerAddresses) {
116         this(id, peerAddresses, Optional.<ConfigParams>absent());
117     }
118
119     public RaftActor(String id, Map<String, String> peerAddresses,
120          Optional<ConfigParams> configParams) {
121
122         context = new RaftActorContextImpl(this.getSelf(),
123             this.getContext(), id, new ElectionTermImpl(),
124             -1, -1, replicatedLog, peerAddresses,
125             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
126             LOG);
127     }
128
129     private void initRecoveryTimer() {
130         if(recoveryTimer == null) {
131             recoveryTimer = new Stopwatch();
132             recoveryTimer.start();
133         }
134     }
135
136     @Override
137     public void preStart() throws Exception {
138         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
139                 context.getConfigParams().getJournalRecoveryLogBatchSize());
140         super.preStart();
141     }
142
143     @Override
144     public void onReceiveRecover(Object message) {
145         if (message instanceof SnapshotOffer) {
146             onRecoveredSnapshot((SnapshotOffer)message);
147         } else if (message instanceof ReplicatedLogEntry) {
148             onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
149         } else if (message instanceof ApplyLogEntries) {
150             onRecoveredApplyLogEntries((ApplyLogEntries)message);
151         } else if (message instanceof DeleteEntries) {
152             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
153         } else if (message instanceof UpdateElectionTerm) {
154             context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
155                     ((UpdateElectionTerm) message).getVotedFor());
156         } else if (message instanceof RecoveryCompleted) {
157             onRecoveryCompletedMessage();
158         }
159     }
160
161     private void onRecoveredSnapshot(SnapshotOffer offer) {
162         LOG.debug("SnapshotOffer called..");
163
164         initRecoveryTimer();
165
166         Snapshot snapshot = (Snapshot) offer.snapshot();
167
168         // Create a replicated log with the snapshot information
169         // The replicated log can be used later on to retrieve this snapshot
170         // when we need to install it on a peer
171         replicatedLog = new ReplicatedLogImpl(snapshot);
172
173         context.setReplicatedLog(replicatedLog);
174         context.setLastApplied(snapshot.getLastAppliedIndex());
175         context.setCommitIndex(snapshot.getLastAppliedIndex());
176
177         Stopwatch timer = new Stopwatch();
178         timer.start();
179
180         // Apply the snapshot to the actors state
181         applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
182
183         timer.stop();
184         LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
185                 replicatedLog.size(), persistenceId(), timer.toString(),
186                 replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
187     }
188
189     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
190         if(LOG.isDebugEnabled()) {
191             LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
192         }
193
194         replicatedLog.append(logEntry);
195     }
196
197     private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
198         if(LOG.isDebugEnabled()) {
199             LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
200                     context.getLastApplied() + 1, ale.getToIndex());
201         }
202
203         for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
204             batchRecoveredLogEntry(replicatedLog.get(i));
205         }
206
207         context.setLastApplied(ale.getToIndex());
208         context.setCommitIndex(ale.getToIndex());
209     }
210
211     private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
212         initRecoveryTimer();
213
214         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
215         if(currentRecoveryBatchCount == 0) {
216             startLogRecoveryBatch(batchSize);
217         }
218
219         appendRecoveredLogEntry(logEntry.getData());
220
221         if(++currentRecoveryBatchCount >= batchSize) {
222             endCurrentLogRecoveryBatch();
223         }
224     }
225
226     private void endCurrentLogRecoveryBatch() {
227         applyCurrentLogRecoveryBatch();
228         currentRecoveryBatchCount = 0;
229     }
230
231     private void onRecoveryCompletedMessage() {
232         if(currentRecoveryBatchCount > 0) {
233             endCurrentLogRecoveryBatch();
234         }
235
236         onRecoveryComplete();
237
238         String recoveryTime = "";
239         if(recoveryTimer != null) {
240             recoveryTimer.stop();
241             recoveryTime = " in " + recoveryTimer.toString();
242             recoveryTimer = null;
243         }
244
245         LOG.info(
246             "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
247                 "Persistence Id =  " + persistenceId() +
248                 " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
249                 "journal-size={}",
250             replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
251             replicatedLog.snapshotTerm, replicatedLog.size());
252
253         currentBehavior = switchBehavior(RaftState.Follower);
254         onStateChanged();
255     }
256
257     @Override public void onReceiveCommand(Object message) {
258         if (message instanceof ApplyState){
259             ApplyState applyState = (ApplyState) message;
260
261             if(LOG.isDebugEnabled()) {
262                 LOG.debug("Applying state for log index {} data {}",
263                     applyState.getReplicatedLogEntry().getIndex(),
264                     applyState.getReplicatedLogEntry().getData());
265             }
266
267             applyState(applyState.getClientActor(), applyState.getIdentifier(),
268                 applyState.getReplicatedLogEntry().getData());
269
270         } else if (message instanceof ApplyLogEntries){
271             ApplyLogEntries ale = (ApplyLogEntries) message;
272             if(LOG.isDebugEnabled()) {
273                 LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
274             }
275             persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
276                 @Override
277                 public void apply(ApplyLogEntries param) throws Exception {
278                 }
279             });
280
281         } else if(message instanceof ApplySnapshot ) {
282             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
283
284             if(LOG.isDebugEnabled()) {
285                 LOG.debug("ApplySnapshot called on Follower Actor " +
286                         "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
287                     snapshot.getLastAppliedTerm()
288                 );
289             }
290             applySnapshot(ByteString.copyFrom(snapshot.getState()));
291
292             //clears the followers log, sets the snapshot index to ensure adjusted-index works
293             replicatedLog = new ReplicatedLogImpl(snapshot);
294             context.setReplicatedLog(replicatedLog);
295             context.setLastApplied(snapshot.getLastAppliedIndex());
296
297         } else if (message instanceof FindLeader) {
298             getSender().tell(
299                 new FindLeaderReply(getLeaderAddress()),
300                 getSelf()
301             );
302
303         } else if (message instanceof SaveSnapshotSuccess) {
304             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
305             LOG.info("SaveSnapshotSuccess received for snapshot");
306
307             context.getReplicatedLog().snapshotCommit();
308
309             // TODO: Not sure if we want to be this aggressive with trimming stuff
310             trimPersistentData(success.metadata().sequenceNr());
311
312         } else if (message instanceof SaveSnapshotFailure) {
313             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
314
315             LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
316             LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
317
318             context.getReplicatedLog().snapshotRollback();
319
320             LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
321                 "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
322                 context.getReplicatedLog().getSnapshotIndex(),
323                 context.getReplicatedLog().getSnapshotTerm(),
324                 context.getReplicatedLog().size());
325
326         } else if (message instanceof AddRaftPeer){
327
328             // FIXME : Do not add raft peers like this.
329             // When adding a new Peer we have to ensure that the a majority of
330             // the peers know about the new Peer. Doing it this way may cause
331             // a situation where multiple Leaders may emerge
332             AddRaftPeer arp = (AddRaftPeer)message;
333            context.addToPeers(arp.getName(), arp.getAddress());
334
335         } else if (message instanceof RemoveRaftPeer){
336
337             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
338             context.removePeer(rrp.getName());
339
340         } else if (message instanceof CaptureSnapshot) {
341             LOG.info("CaptureSnapshot received by actor");
342             CaptureSnapshot cs = (CaptureSnapshot)message;
343             captureSnapshot = cs;
344             createSnapshot();
345
346         } else if (message instanceof CaptureSnapshotReply){
347             LOG.info("CaptureSnapshotReply received by actor");
348             CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
349
350             ByteString stateInBytes = csr.getSnapshot();
351             LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
352             handleCaptureSnapshotReply(stateInBytes);
353
354         } else {
355             if (!(message instanceof AppendEntriesMessages.AppendEntries)
356                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
357                 if(LOG.isDebugEnabled()) {
358                     LOG.debug("onReceiveCommand: message:" + message.getClass());
359                 }
360             }
361
362             RaftState state =
363                 currentBehavior.handleMessage(getSender(), message);
364             RaftActorBehavior oldBehavior = currentBehavior;
365             currentBehavior = switchBehavior(state);
366             if(oldBehavior != currentBehavior){
367                 onStateChanged();
368             }
369
370             onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
371         }
372     }
373
374     public java.util.Set<String> getPeers() {
375         return context.getPeerAddresses().keySet();
376     }
377
378     protected String getReplicatedLogState() {
379         return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
380             + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
381             + ", im-mem journal size=" + context.getReplicatedLog().size();
382     }
383
384
385     /**
386      * When a derived RaftActor needs to persist something it must call
387      * persistData.
388      *
389      * @param clientActor
390      * @param identifier
391      * @param data
392      */
393     protected void persistData(ActorRef clientActor, String identifier,
394         Payload data) {
395
396         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
397             context.getReplicatedLog().lastIndex() + 1,
398             context.getTermInformation().getCurrentTerm(), data);
399
400         if(LOG.isDebugEnabled()) {
401             LOG.debug("Persist data {}", replicatedLogEntry);
402         }
403
404         replicatedLog
405             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
406     }
407
408     protected String getId() {
409         return context.getId();
410     }
411
412     /**
413      * Derived actors can call the isLeader method to check if the current
414      * RaftActor is the Leader or not
415      *
416      * @return true it this RaftActor is a Leader false otherwise
417      */
418     protected boolean isLeader() {
419         return context.getId().equals(currentBehavior.getLeaderId());
420     }
421
422     /**
423      * Derived actor can call getLeader if they need a reference to the Leader.
424      * This would be useful for example in forwarding a request to an actor
425      * which is the leader
426      *
427      * @return A reference to the leader if known, null otherwise
428      */
429     protected ActorSelection getLeader(){
430         String leaderAddress = getLeaderAddress();
431
432         if(leaderAddress == null){
433             return null;
434         }
435
436         return context.actorSelection(leaderAddress);
437     }
438
439     /**
440      *
441      * @return the current leader's id
442      */
443     protected String getLeaderId(){
444         return currentBehavior.getLeaderId();
445     }
446
447     protected RaftState getRaftState() {
448         return currentBehavior.state();
449     }
450
451     protected ReplicatedLogEntry getLastLogEntry() {
452         return replicatedLog.last();
453     }
454
455     protected Long getCurrentTerm(){
456         return context.getTermInformation().getCurrentTerm();
457     }
458
459     protected Long getCommitIndex(){
460         return context.getCommitIndex();
461     }
462
463     protected Long getLastApplied(){
464         return context.getLastApplied();
465     }
466
467     protected RaftActorContext getRaftActorContext() {
468         return context;
469     }
470
471     /**
472      * setPeerAddress sets the address of a known peer at a later time.
473      * <p>
474      * This is to account for situations where a we know that a peer
475      * exists but we do not know an address up-front. This may also be used in
476      * situations where a known peer starts off in a different location and we
477      * need to change it's address
478      * <p>
479      * Note that if the peerId does not match the list of peers passed to
480      * this actor during construction an IllegalStateException will be thrown.
481      *
482      * @param peerId
483      * @param peerAddress
484      */
485     protected void setPeerAddress(String peerId, String peerAddress){
486         context.setPeerAddress(peerId, peerAddress);
487     }
488
489
490
491     /**
492      * The applyState method will be called by the RaftActor when some data
493      * needs to be applied to the actor's state
494      *
495      * @param clientActor A reference to the client who sent this message. This
496      *                    is the same reference that was passed to persistData
497      *                    by the derived actor. clientActor may be null when
498      *                    the RaftActor is behaving as a follower or during
499      *                    recovery.
500      * @param identifier  The identifier of the persisted data. This is also
501      *                    the same identifier that was passed to persistData by
502      *                    the derived actor. identifier may be null when
503      *                    the RaftActor is behaving as a follower or during
504      *                    recovery
505      * @param data        A piece of data that was persisted by the persistData call.
506      *                    This should NEVER be null.
507      */
508     protected abstract void applyState(ActorRef clientActor, String identifier,
509         Object data);
510
511     /**
512      * This method is called during recovery at the start of a batch of state entries. Derived
513      * classes should perform any initialization needed to start a batch.
514      */
515     protected abstract void startLogRecoveryBatch(int maxBatchSize);
516
517     /**
518      * This method is called during recovery to append state data to the current batch. This method
519      * is called 1 or more times after {@link #startRecoveryStateBatch}.
520      *
521      * @param data the state data
522      */
523     protected abstract void appendRecoveredLogEntry(Payload data);
524
525     /**
526      * This method is called during recovery to reconstruct the state of the actor.
527      *
528      * @param snapshot A snapshot of the state of the actor
529      */
530     protected abstract void applyRecoverySnapshot(ByteString snapshot);
531
532     /**
533      * This method is called during recovery at the end of a batch to apply the current batched
534      * log entries. This method is called after {@link #appendRecoveryLogEntry}.
535      */
536     protected abstract void applyCurrentLogRecoveryBatch();
537
538     /**
539      * This method is called when recovery is complete.
540      */
541     protected abstract void onRecoveryComplete();
542
543     /**
544      * This method will be called by the RaftActor when a snapshot needs to be
545      * created. The derived actor should respond with its current state.
546      * <p/>
547      * During recovery the state that is returned by the derived actor will
548      * be passed back to it by calling the applySnapshot  method
549      *
550      * @return The current state of the actor
551      */
552     protected abstract void createSnapshot();
553
554     /**
555      * This method can be called at any other point during normal
556      * operations when the derived actor is out of sync with it's peers
557      * and the only way to bring it in sync is by applying a snapshot
558      *
559      * @param snapshot A snapshot of the state of the actor
560      */
561     protected abstract void applySnapshot(ByteString snapshot);
562
563     /**
564      * This method will be called by the RaftActor when the state of the
565      * RaftActor changes. The derived actor can then use methods like
566      * isLeader or getLeader to do something useful
567      */
568     protected abstract void onStateChanged();
569
570     protected void onLeaderChanged(String oldLeader, String newLeader){};
571
572     private RaftActorBehavior switchBehavior(RaftState state) {
573         if (currentBehavior != null) {
574             if (currentBehavior.state() == state) {
575                 return currentBehavior;
576             }
577             LOG.info("Switching from state " + currentBehavior.state() + " to "
578                 + state);
579
580             try {
581                 currentBehavior.close();
582             } catch (Exception e) {
583                 LOG.error(e,
584                     "Failed to close behavior : " + currentBehavior.state());
585             }
586
587         } else {
588             LOG.info("Switching behavior to " + state);
589         }
590         RaftActorBehavior behavior = null;
591         if (state == RaftState.Candidate) {
592             behavior = new Candidate(context);
593         } else if (state == RaftState.Follower) {
594             behavior = new Follower(context);
595         } else {
596             behavior = new Leader(context);
597         }
598
599
600
601         return behavior;
602     }
603
604     private void trimPersistentData(long sequenceNumber) {
605         // Trim akka snapshots
606         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
607         // For now guessing that it is ANDed.
608         deleteSnapshots(new SnapshotSelectionCriteria(
609             sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
610
611         // Trim akka journal
612         deleteMessages(sequenceNumber);
613     }
614
615     private String getLeaderAddress(){
616         if(isLeader()){
617             return getSelf().path().toString();
618         }
619         String leaderId = currentBehavior.getLeaderId();
620         if (leaderId == null) {
621             return null;
622         }
623         String peerAddress = context.getPeerAddress(leaderId);
624         if(LOG.isDebugEnabled()) {
625             LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
626                 + peerAddress);
627         }
628
629         return peerAddress;
630     }
631
632     private void handleCaptureSnapshotReply(ByteString stateInBytes) {
633         // create a snapshot object from the state provided and save it
634         // when snapshot is saved async, SaveSnapshotSuccess is raised.
635
636         Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
637             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
638             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
639             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
640
641         saveSnapshot(sn);
642
643         LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
644
645         //be greedy and remove entries from in-mem journal which are in the snapshot
646         // and update snapshotIndex and snapshotTerm without waiting for the success,
647
648         context.getReplicatedLog().snapshotPreCommit(stateInBytes,
649             captureSnapshot.getLastAppliedIndex(),
650             captureSnapshot.getLastAppliedTerm());
651
652         LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
653             "and term:{}", captureSnapshot.getLastAppliedIndex(),
654             captureSnapshot.getLastAppliedTerm());
655
656         captureSnapshot = null;
657         hasSnapshotCaptureInitiated = false;
658     }
659
660
661     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
662
663         public ReplicatedLogImpl(Snapshot snapshot) {
664             super(ByteString.copyFrom(snapshot.getState()),
665                 snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
666                 snapshot.getUnAppliedEntries());
667         }
668
669         public ReplicatedLogImpl() {
670             super();
671         }
672
673         @Override public void removeFromAndPersist(long logEntryIndex) {
674             int adjustedIndex = adjustedIndex(logEntryIndex);
675
676             if (adjustedIndex < 0) {
677                 return;
678             }
679
680             // FIXME: Maybe this should be done after the command is saved
681             journal.subList(adjustedIndex , journal.size()).clear();
682
683             persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
684
685                 @Override public void apply(DeleteEntries param)
686                     throws Exception {
687                     //FIXME : Doing nothing for now
688                 }
689             });
690         }
691
692         @Override public void appendAndPersist(
693             final ReplicatedLogEntry replicatedLogEntry) {
694             appendAndPersist(null, null, replicatedLogEntry);
695         }
696
697         public void appendAndPersist(final ActorRef clientActor,
698             final String identifier,
699             final ReplicatedLogEntry replicatedLogEntry) {
700             context.getLogger().debug(
701                 "Append log entry and persist {} ", replicatedLogEntry);
702             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
703             journal.add(replicatedLogEntry);
704
705             // When persisting events with persist it is guaranteed that the
706             // persistent actor will not receive further commands between the
707             // persist call and the execution(s) of the associated event
708             // handler. This also holds for multiple persist calls in context
709             // of a single command.
710             persist(replicatedLogEntry,
711                 new Procedure<ReplicatedLogEntry>() {
712                     @Override
713                     public void apply(ReplicatedLogEntry evt) throws Exception {
714                         // when a snaphsot is being taken, captureSnapshot != null
715                         if (hasSnapshotCaptureInitiated == false &&
716                             journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
717
718                             LOG.info("Initiating Snapshot Capture..");
719                             long lastAppliedIndex = -1;
720                             long lastAppliedTerm = -1;
721
722                             ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
723                             if (lastAppliedEntry != null) {
724                                 lastAppliedIndex = lastAppliedEntry.getIndex();
725                                 lastAppliedTerm = lastAppliedEntry.getTerm();
726                             }
727
728                             if(LOG.isDebugEnabled()) {
729                                 LOG.debug("Snapshot Capture logSize: {}", journal.size());
730                                 LOG.debug("Snapshot Capture lastApplied:{} ",
731                                     context.getLastApplied());
732                                 LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
733                                 LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
734                             }
735
736                             // send a CaptureSnapshot to self to make the expensive operation async.
737                             getSelf().tell(new CaptureSnapshot(
738                                 lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
739                                 null);
740                             hasSnapshotCaptureInitiated = true;
741                         }
742                         // Send message for replication
743                         if (clientActor != null) {
744                             currentBehavior.handleMessage(getSelf(),
745                                 new Replicate(clientActor, identifier,
746                                     replicatedLogEntry)
747                             );
748                         }
749                     }
750                 }
751             );
752         }
753
754     }
755
756     private static class DeleteEntries implements Serializable {
757         private final int fromIndex;
758
759
760         public DeleteEntries(int fromIndex) {
761             this.fromIndex = fromIndex;
762         }
763
764         public int getFromIndex() {
765             return fromIndex;
766         }
767     }
768
769
770     private class ElectionTermImpl implements ElectionTerm {
771         /**
772          * Identifier of the actor whose election term information this is
773          */
774         private long currentTerm = 0;
775         private String votedFor = null;
776
777         @Override
778         public long getCurrentTerm() {
779             return currentTerm;
780         }
781
782         @Override
783         public String getVotedFor() {
784             return votedFor;
785         }
786
787         @Override public void update(long currentTerm, String votedFor) {
788             if(LOG.isDebugEnabled()) {
789                 LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
790             }
791             this.currentTerm = currentTerm;
792             this.votedFor = votedFor;
793         }
794
795         @Override
796         public void updateAndPersist(long currentTerm, String votedFor){
797             update(currentTerm, votedFor);
798             // FIXME : Maybe first persist then update the state
799             persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
800
801                 @Override public void apply(UpdateElectionTerm param)
802                     throws Exception {
803
804                 }
805             });
806         }
807     }
808
809     private static class UpdateElectionTerm implements Serializable {
810         private final long currentTerm;
811         private final String votedFor;
812
813         public UpdateElectionTerm(long currentTerm, String votedFor) {
814             this.currentTerm = currentTerm;
815             this.votedFor = votedFor;
816         }
817
818         public long getCurrentTerm() {
819             return currentTerm;
820         }
821
822         public String getVotedFor() {
823             return votedFor;
824         }
825     }
826
827 }