BUG 1815 - Do not allow Shards to be created till an appropriate schema context is...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.event.Logging;
14 import akka.event.LoggingAdapter;
15 import akka.japi.Procedure;
16 import akka.persistence.RecoveryCompleted;
17 import akka.persistence.SaveSnapshotFailure;
18 import akka.persistence.SaveSnapshotSuccess;
19 import akka.persistence.SnapshotOffer;
20 import akka.persistence.SnapshotSelectionCriteria;
21 import akka.persistence.UntypedPersistentActor;
22 import com.google.common.base.Optional;
23 import com.google.protobuf.ByteString;
24 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
25 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
26 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
27 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
28 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
29 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
30 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
31 import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
32 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
33 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
34 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
35 import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer;
36 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
37 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
38 import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer;
39 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
40 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
41 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
42
43 import java.io.Serializable;
44 import java.util.Map;
45
46 /**
47  * RaftActor encapsulates a state machine that needs to be kept synchronized
48  * in a cluster. It implements the RAFT algorithm as described in the paper
49  * <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
50  * In Search of an Understandable Consensus Algorithm</a>
51  * <p/>
52  * RaftActor has 3 states and each state has a certain behavior associated
53  * with it. A Raft actor can behave as,
54  * <ul>
55  * <li> A Leader </li>
56  * <li> A Follower (or) </li>
57  * <li> A Candidate </li>
58  * </ul>
59  * <p/>
60  * <p/>
61  * A RaftActor MUST be a Leader in order to accept requests from clients to
62  * change the state of it's encapsulated state machine. Once a RaftActor becomes
63  * a Leader it is also responsible for ensuring that all followers ultimately
64  * have the same log and therefore the same state machine as itself.
65  * <p/>
66  * <p/>
67  * The current behavior of a RaftActor determines how election for leadership
68  * is initiated and how peer RaftActors react to request for votes.
69  * <p/>
70  * <p/>
71  * Each RaftActor also needs to know the current election term. It uses this
72  * information for a couple of things. One is to simply figure out who it
73  * voted for in the last election. Another is to figure out if the message
74  * it received to update it's state is stale.
75  * <p/>
76  * <p/>
77  * The RaftActor uses akka-persistence to store it's replicated log.
78  * Furthermore through it's behaviors a Raft Actor determines
79  * <p/>
80  * <ul>
81  * <li> when a log entry should be persisted </li>
82  * <li> when a log entry should be applied to the state machine (and) </li>
83  * <li> when a snapshot should be saved </li>
84  * </ul>
85  */
86 public abstract class RaftActor extends UntypedPersistentActor {
87     protected final LoggingAdapter LOG =
88         Logging.getLogger(getContext().system(), this);
89
90     /**
91      * The current state determines the current behavior of a RaftActor
92      * A Raft Actor always starts off in the Follower State
93      */
94     private RaftActorBehavior currentBehavior;
95
96     /**
97      * This context should NOT be passed directly to any other actor it is
98      * only to be consumed by the RaftActorBehaviors
99      */
100     protected RaftActorContext context;
101
102     /**
103      * The in-memory journal
104      */
105     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
106
107     private CaptureSnapshot captureSnapshot = null;
108
109     private volatile boolean hasSnapshotCaptureInitiated = false;
110
111     public RaftActor(String id, Map<String, String> peerAddresses) {
112         this(id, peerAddresses, Optional.<ConfigParams>absent());
113     }
114
115     public RaftActor(String id, Map<String, String> peerAddresses,
116          Optional<ConfigParams> configParams) {
117
118         context = new RaftActorContextImpl(this.getSelf(),
119             this.getContext(), id, new ElectionTermImpl(),
120             -1, -1, replicatedLog, peerAddresses,
121             (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
122             LOG);
123     }
124
125     @Override public void onReceiveRecover(Object message) {
126         if (message instanceof SnapshotOffer) {
127             LOG.info("SnapshotOffer called..");
128             SnapshotOffer offer = (SnapshotOffer) message;
129             Snapshot snapshot = (Snapshot) offer.snapshot();
130
131             // Create a replicated log with the snapshot information
132             // The replicated log can be used later on to retrieve this snapshot
133             // when we need to install it on a peer
134             replicatedLog = new ReplicatedLogImpl(snapshot);
135
136             context.setReplicatedLog(replicatedLog);
137             context.setLastApplied(snapshot.getLastAppliedIndex());
138             context.setCommitIndex(snapshot.getLastAppliedIndex());
139
140             LOG.info("Applied snapshot to replicatedLog. " +
141                     "snapshotIndex={}, snapshotTerm={}, journal-size={}",
142                 replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
143                 replicatedLog.size()
144             );
145
146             // Apply the snapshot to the actors state
147             applySnapshot(ByteString.copyFrom(snapshot.getState()));
148
149         } else if (message instanceof ReplicatedLogEntry) {
150             ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
151             if(LOG.isDebugEnabled()) {
152                 LOG.debug("Received ReplicatedLogEntry for recovery:{}", logEntry.getIndex());
153             }
154             replicatedLog.append(logEntry);
155
156         } else if (message instanceof ApplyLogEntries) {
157             ApplyLogEntries ale = (ApplyLogEntries) message;
158
159             if(LOG.isDebugEnabled()) {
160                 LOG.debug("Received ApplyLogEntries for recovery, applying to state:{} to {}",
161                     context.getLastApplied() + 1, ale.getToIndex());
162             }
163
164             for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
165                 applyState(null, "recovery", replicatedLog.get(i).getData());
166             }
167             context.setLastApplied(ale.getToIndex());
168             context.setCommitIndex(ale.getToIndex());
169
170         } else if (message instanceof DeleteEntries) {
171             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
172
173         } else if (message instanceof UpdateElectionTerm) {
174             context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
175                 ((UpdateElectionTerm) message).getVotedFor());
176
177         } else if (message instanceof RecoveryCompleted) {
178             LOG.info(
179                 "RecoveryCompleted - Switching actor to Follower - " +
180                     "Persistence Id =  " + persistenceId() +
181                     " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
182                     "journal-size={}",
183                 replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
184                 replicatedLog.snapshotTerm, replicatedLog.size());
185             currentBehavior = switchBehavior(RaftState.Follower);
186             onStateChanged();
187         }
188     }
189
190     @Override public void onReceiveCommand(Object message) {
191         if (message instanceof ApplyState){
192             ApplyState applyState = (ApplyState) message;
193
194             if(LOG.isDebugEnabled()) {
195                 LOG.debug("Applying state for log index {} data {}",
196                     applyState.getReplicatedLogEntry().getIndex(),
197                     applyState.getReplicatedLogEntry().getData());
198             }
199
200             applyState(applyState.getClientActor(), applyState.getIdentifier(),
201                 applyState.getReplicatedLogEntry().getData());
202
203         } else if (message instanceof ApplyLogEntries){
204             ApplyLogEntries ale = (ApplyLogEntries) message;
205             if(LOG.isDebugEnabled()) {
206                 LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex());
207             }
208             persist(new ApplyLogEntries(ale.getToIndex()), new Procedure<ApplyLogEntries>() {
209                 @Override
210                 public void apply(ApplyLogEntries param) throws Exception {
211                 }
212             });
213
214         } else if(message instanceof ApplySnapshot ) {
215             Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
216
217             if(LOG.isDebugEnabled()) {
218                 LOG.debug("ApplySnapshot called on Follower Actor " +
219                         "snapshotIndex:{}, snapshotTerm:{}", snapshot.getLastAppliedIndex(),
220                     snapshot.getLastAppliedTerm()
221                 );
222             }
223             applySnapshot(ByteString.copyFrom(snapshot.getState()));
224
225             //clears the followers log, sets the snapshot index to ensure adjusted-index works
226             replicatedLog = new ReplicatedLogImpl(snapshot);
227             context.setReplicatedLog(replicatedLog);
228             context.setLastApplied(snapshot.getLastAppliedIndex());
229
230         } else if (message instanceof FindLeader) {
231             getSender().tell(
232                 new FindLeaderReply(getLeaderAddress()),
233                 getSelf()
234             );
235
236         } else if (message instanceof SaveSnapshotSuccess) {
237             SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
238             LOG.info("SaveSnapshotSuccess received for snapshot");
239
240             context.getReplicatedLog().snapshotCommit();
241
242             // TODO: Not sure if we want to be this aggressive with trimming stuff
243             trimPersistentData(success.metadata().sequenceNr());
244
245         } else if (message instanceof SaveSnapshotFailure) {
246             SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
247
248             LOG.info("saveSnapshotFailure.metadata():{}", saveSnapshotFailure.metadata().toString());
249             LOG.error(saveSnapshotFailure.cause(), "SaveSnapshotFailure received for snapshot Cause:");
250
251             context.getReplicatedLog().snapshotRollback();
252
253             LOG.info("Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
254                 "snapshotIndex:{}, snapshotTerm:{}, log-size:{}",
255                 context.getReplicatedLog().getSnapshotIndex(),
256                 context.getReplicatedLog().getSnapshotTerm(),
257                 context.getReplicatedLog().size());
258
259         } else if (message instanceof AddRaftPeer){
260
261             // FIXME : Do not add raft peers like this.
262             // When adding a new Peer we have to ensure that the a majority of
263             // the peers know about the new Peer. Doing it this way may cause
264             // a situation where multiple Leaders may emerge
265             AddRaftPeer arp = (AddRaftPeer)message;
266            context.addToPeers(arp.getName(), arp.getAddress());
267
268         } else if (message instanceof RemoveRaftPeer){
269
270             RemoveRaftPeer rrp = (RemoveRaftPeer)message;
271             context.removePeer(rrp.getName());
272
273         } else if (message instanceof CaptureSnapshot) {
274             LOG.info("CaptureSnapshot received by actor");
275             CaptureSnapshot cs = (CaptureSnapshot)message;
276             captureSnapshot = cs;
277             createSnapshot();
278
279         } else if (message instanceof CaptureSnapshotReply){
280             LOG.info("CaptureSnapshotReply received by actor");
281             CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
282
283             ByteString stateInBytes = csr.getSnapshot();
284             LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
285             handleCaptureSnapshotReply(stateInBytes);
286
287         } else {
288             if (!(message instanceof AppendEntriesMessages.AppendEntries)
289                 && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) {
290                 if(LOG.isDebugEnabled()) {
291                     LOG.debug("onReceiveCommand: message:" + message.getClass());
292                 }
293             }
294
295             RaftState state =
296                 currentBehavior.handleMessage(getSender(), message);
297             RaftActorBehavior oldBehavior = currentBehavior;
298             currentBehavior = switchBehavior(state);
299             if(oldBehavior != currentBehavior){
300                 onStateChanged();
301             }
302
303             onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
304         }
305     }
306
307     public java.util.Set<String> getPeers() {
308         return context.getPeerAddresses().keySet();
309     }
310
311     protected String getReplicatedLogState() {
312         return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
313             + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
314             + ", im-mem journal size=" + context.getReplicatedLog().size();
315     }
316
317
318     /**
319      * When a derived RaftActor needs to persist something it must call
320      * persistData.
321      *
322      * @param clientActor
323      * @param identifier
324      * @param data
325      */
326     protected void persistData(ActorRef clientActor, String identifier,
327         Payload data) {
328
329         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
330             context.getReplicatedLog().lastIndex() + 1,
331             context.getTermInformation().getCurrentTerm(), data);
332
333         if(LOG.isDebugEnabled()) {
334             LOG.debug("Persist data {}", replicatedLogEntry);
335         }
336
337         replicatedLog
338             .appendAndPersist(clientActor, identifier, replicatedLogEntry);
339     }
340
341     protected String getId() {
342         return context.getId();
343     }
344
345     /**
346      * Derived actors can call the isLeader method to check if the current
347      * RaftActor is the Leader or not
348      *
349      * @return true it this RaftActor is a Leader false otherwise
350      */
351     protected boolean isLeader() {
352         return context.getId().equals(currentBehavior.getLeaderId());
353     }
354
355     /**
356      * Derived actor can call getLeader if they need a reference to the Leader.
357      * This would be useful for example in forwarding a request to an actor
358      * which is the leader
359      *
360      * @return A reference to the leader if known, null otherwise
361      */
362     protected ActorSelection getLeader(){
363         String leaderAddress = getLeaderAddress();
364
365         if(leaderAddress == null){
366             return null;
367         }
368
369         return context.actorSelection(leaderAddress);
370     }
371
372     /**
373      *
374      * @return the current leader's id
375      */
376     protected String getLeaderId(){
377         return currentBehavior.getLeaderId();
378     }
379
380     protected RaftState getRaftState() {
381         return currentBehavior.state();
382     }
383
384     protected ReplicatedLogEntry getLastLogEntry() {
385         return replicatedLog.last();
386     }
387
388     protected Long getCurrentTerm(){
389         return context.getTermInformation().getCurrentTerm();
390     }
391
392     protected Long getCommitIndex(){
393         return context.getCommitIndex();
394     }
395
396     protected Long getLastApplied(){
397         return context.getLastApplied();
398     }
399
400     /**
401      * setPeerAddress sets the address of a known peer at a later time.
402      * <p>
403      * This is to account for situations where a we know that a peer
404      * exists but we do not know an address up-front. This may also be used in
405      * situations where a known peer starts off in a different location and we
406      * need to change it's address
407      * <p>
408      * Note that if the peerId does not match the list of peers passed to
409      * this actor during construction an IllegalStateException will be thrown.
410      *
411      * @param peerId
412      * @param peerAddress
413      */
414     protected void setPeerAddress(String peerId, String peerAddress){
415         context.setPeerAddress(peerId, peerAddress);
416     }
417
418
419
420     /**
421      * The applyState method will be called by the RaftActor when some data
422      * needs to be applied to the actor's state
423      *
424      * @param clientActor A reference to the client who sent this message. This
425      *                    is the same reference that was passed to persistData
426      *                    by the derived actor. clientActor may be null when
427      *                    the RaftActor is behaving as a follower or during
428      *                    recovery.
429      * @param identifier  The identifier of the persisted data. This is also
430      *                    the same identifier that was passed to persistData by
431      *                    the derived actor. identifier may be null when
432      *                    the RaftActor is behaving as a follower or during
433      *                    recovery
434      * @param data        A piece of data that was persisted by the persistData call.
435      *                    This should NEVER be null.
436      */
437     protected abstract void applyState(ActorRef clientActor, String identifier,
438         Object data);
439
440     /**
441      * This method will be called by the RaftActor when a snapshot needs to be
442      * created. The derived actor should respond with its current state.
443      * <p/>
444      * During recovery the state that is returned by the derived actor will
445      * be passed back to it by calling the applySnapshot  method
446      *
447      * @return The current state of the actor
448      */
449     protected abstract void createSnapshot();
450
451     /**
452      * This method will be called by the RaftActor during recovery to
453      * reconstruct the state of the actor.
454      * <p/>
455      * This method may also be called at any other point during normal
456      * operations when the derived actor is out of sync with it's peers
457      * and the only way to bring it in sync is by applying a snapshot
458      *
459      * @param snapshot A snapshot of the state of the actor
460      */
461     protected abstract void applySnapshot(ByteString snapshot);
462
463     /**
464      * This method will be called by the RaftActor when the state of the
465      * RaftActor changes. The derived actor can then use methods like
466      * isLeader or getLeader to do something useful
467      */
468     protected abstract void onStateChanged();
469
470     protected void onLeaderChanged(String oldLeader, String newLeader){};
471
472     private RaftActorBehavior switchBehavior(RaftState state) {
473         if (currentBehavior != null) {
474             if (currentBehavior.state() == state) {
475                 return currentBehavior;
476             }
477             LOG.info("Switching from state " + currentBehavior.state() + " to "
478                 + state);
479
480             try {
481                 currentBehavior.close();
482             } catch (Exception e) {
483                 LOG.error(e,
484                     "Failed to close behavior : " + currentBehavior.state());
485             }
486
487         } else {
488             LOG.info("Switching behavior to " + state);
489         }
490         RaftActorBehavior behavior = null;
491         if (state == RaftState.Candidate) {
492             behavior = new Candidate(context);
493         } else if (state == RaftState.Follower) {
494             behavior = new Follower(context);
495         } else {
496             behavior = new Leader(context);
497         }
498
499
500
501         return behavior;
502     }
503
504     private void trimPersistentData(long sequenceNumber) {
505         // Trim akka snapshots
506         // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
507         // For now guessing that it is ANDed.
508         deleteSnapshots(new SnapshotSelectionCriteria(
509             sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
510
511         // Trim akka journal
512         deleteMessages(sequenceNumber);
513     }
514
515     private String getLeaderAddress(){
516         if(isLeader()){
517             return getSelf().path().toString();
518         }
519         String leaderId = currentBehavior.getLeaderId();
520         if (leaderId == null) {
521             return null;
522         }
523         String peerAddress = context.getPeerAddress(leaderId);
524         if(LOG.isDebugEnabled()) {
525             LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
526                 + peerAddress);
527         }
528
529         return peerAddress;
530     }
531
532     private void handleCaptureSnapshotReply(ByteString stateInBytes) {
533         // create a snapshot object from the state provided and save it
534         // when snapshot is saved async, SaveSnapshotSuccess is raised.
535
536         Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
537             context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
538             captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
539             captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
540
541         saveSnapshot(sn);
542
543         LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
544
545         //be greedy and remove entries from in-mem journal which are in the snapshot
546         // and update snapshotIndex and snapshotTerm without waiting for the success,
547
548         context.getReplicatedLog().snapshotPreCommit(stateInBytes,
549             captureSnapshot.getLastAppliedIndex(),
550             captureSnapshot.getLastAppliedTerm());
551
552         LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
553             "and term:{}", captureSnapshot.getLastAppliedIndex(),
554             captureSnapshot.getLastAppliedTerm());
555
556         captureSnapshot = null;
557         hasSnapshotCaptureInitiated = false;
558     }
559
560
561     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
562
563         public ReplicatedLogImpl(Snapshot snapshot) {
564             super(ByteString.copyFrom(snapshot.getState()),
565                 snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
566                 snapshot.getUnAppliedEntries());
567         }
568
569         public ReplicatedLogImpl() {
570             super();
571         }
572
573         @Override public void removeFromAndPersist(long logEntryIndex) {
574             int adjustedIndex = adjustedIndex(logEntryIndex);
575
576             if (adjustedIndex < 0) {
577                 return;
578             }
579
580             // FIXME: Maybe this should be done after the command is saved
581             journal.subList(adjustedIndex , journal.size()).clear();
582
583             persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
584
585                 @Override public void apply(DeleteEntries param)
586                     throws Exception {
587                     //FIXME : Doing nothing for now
588                 }
589             });
590         }
591
592         @Override public void appendAndPersist(
593             final ReplicatedLogEntry replicatedLogEntry) {
594             appendAndPersist(null, null, replicatedLogEntry);
595         }
596
597         public void appendAndPersist(final ActorRef clientActor,
598             final String identifier,
599             final ReplicatedLogEntry replicatedLogEntry) {
600             context.getLogger().debug(
601                 "Append log entry and persist {} ", replicatedLogEntry);
602             // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
603             journal.add(replicatedLogEntry);
604
605             // When persisting events with persist it is guaranteed that the
606             // persistent actor will not receive further commands between the
607             // persist call and the execution(s) of the associated event
608             // handler. This also holds for multiple persist calls in context
609             // of a single command.
610             persist(replicatedLogEntry,
611                 new Procedure<ReplicatedLogEntry>() {
612                     public void apply(ReplicatedLogEntry evt) throws Exception {
613                         // when a snaphsot is being taken, captureSnapshot != null
614                         if (hasSnapshotCaptureInitiated == false &&
615                             journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
616
617                             LOG.info("Initiating Snapshot Capture..");
618                             long lastAppliedIndex = -1;
619                             long lastAppliedTerm = -1;
620
621                             ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
622                             if (lastAppliedEntry != null) {
623                                 lastAppliedIndex = lastAppliedEntry.getIndex();
624                                 lastAppliedTerm = lastAppliedEntry.getTerm();
625                             }
626
627                             if(LOG.isDebugEnabled()) {
628                                 LOG.debug("Snapshot Capture logSize: {}", journal.size());
629                                 LOG.debug("Snapshot Capture lastApplied:{} ",
630                                     context.getLastApplied());
631                                 LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
632                                 LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
633                             }
634
635                             // send a CaptureSnapshot to self to make the expensive operation async.
636                             getSelf().tell(new CaptureSnapshot(
637                                 lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
638                                 null);
639                             hasSnapshotCaptureInitiated = true;
640                         }
641                         // Send message for replication
642                         if (clientActor != null) {
643                             currentBehavior.handleMessage(getSelf(),
644                                 new Replicate(clientActor, identifier,
645                                     replicatedLogEntry)
646                             );
647                         }
648                     }
649                 }
650             );
651         }
652
653     }
654
655     private static class DeleteEntries implements Serializable {
656         private final int fromIndex;
657
658
659         public DeleteEntries(int fromIndex) {
660             this.fromIndex = fromIndex;
661         }
662
663         public int getFromIndex() {
664             return fromIndex;
665         }
666     }
667
668
669     private class ElectionTermImpl implements ElectionTerm {
670         /**
671          * Identifier of the actor whose election term information this is
672          */
673         private long currentTerm = 0;
674         private String votedFor = null;
675
676         public long getCurrentTerm() {
677             return currentTerm;
678         }
679
680         public String getVotedFor() {
681             return votedFor;
682         }
683
684         @Override public void update(long currentTerm, String votedFor) {
685             if(LOG.isDebugEnabled()) {
686                 LOG.debug("Set currentTerm={}, votedFor={}", currentTerm, votedFor);
687             }
688             this.currentTerm = currentTerm;
689             this.votedFor = votedFor;
690         }
691
692         @Override
693         public void updateAndPersist(long currentTerm, String votedFor){
694             update(currentTerm, votedFor);
695             // FIXME : Maybe first persist then update the state
696             persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
697
698                 @Override public void apply(UpdateElectionTerm param)
699                     throws Exception {
700
701                 }
702             });
703         }
704     }
705
706     private static class UpdateElectionTerm implements Serializable {
707         private final long currentTerm;
708         private final String votedFor;
709
710         public UpdateElectionTerm(long currentTerm, String votedFor) {
711             this.currentTerm = currentTerm;
712             this.votedFor = votedFor;
713         }
714
715         public long getCurrentTerm() {
716             return currentTerm;
717         }
718
719         public String getVotedFor() {
720             return votedFor;
721         }
722     }
723
724 }