+ private void trimPersistentData(long sequenceNumber) {
+ // Trim akka snapshots
+ // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
+ // For now guessing that it is ANDed.
+ deleteSnapshots(new SnapshotSelectionCriteria(
+ sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+
+ // Trim akka journal
+ deleteMessages(sequenceNumber);
+ }
+
+ private String getLeaderAddress(){
+ if(isLeader()){
+ return getSelf().path().toString();
+ }
+ String leaderId = currentBehavior.getLeaderId();
+ if (leaderId == null) {
+ return null;
+ }
+ String peerAddress = context.getPeerAddress(leaderId);
+ LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
+ + peerAddress);
+
+ return peerAddress;
+ }
+
+ private void handleCaptureSnapshotReply(ByteString stateInBytes) {
+ // create a snapshot object from the state provided and save it
+ // when snapshot is saved async, SaveSnapshotSuccess is raised.
+
+ Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
+ context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+ captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
+ captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
+
+ saveSnapshot(sn);
+
+ LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+ //be greedy and remove entries from in-mem journal which are in the snapshot
+ // and update snapshotIndex and snapshotTerm without waiting for the success,
+
+ context.getReplicatedLog().snapshotPreCommit(stateInBytes,
+ captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
+ "and term:{}", captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
+
+ captureSnapshot = null;
+ hasSnapshotCaptureInitiated = false;
+ }
+
+
+ private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
+
+ public ReplicatedLogImpl(Snapshot snapshot) {
+ super(ByteString.copyFrom(snapshot.getState()),
+ snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+ snapshot.getUnAppliedEntries());
+ }
+
+ public ReplicatedLogImpl() {
+ super();
+ }
+
+ @Override public void removeFromAndPersist(long logEntryIndex) {
+ int adjustedIndex = adjustedIndex(logEntryIndex);
+
+ if (adjustedIndex < 0) {
+ return;
+ }
+
+ // FIXME: Maybe this should be done after the command is saved
+ journal.subList(adjustedIndex , journal.size()).clear();
+
+ persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+ @Override public void apply(DeleteEntries param)
+ throws Exception {
+ //FIXME : Doing nothing for now
+ }
+ });
+ }
+
+ @Override public void appendAndPersist(
+ final ReplicatedLogEntry replicatedLogEntry) {
+ appendAndPersist(null, null, replicatedLogEntry);
+ }
+
+ public void appendAndPersist(final ActorRef clientActor,
+ final String identifier,
+ final ReplicatedLogEntry replicatedLogEntry) {
+ context.getLogger().debug(
+ "Append log entry and persist {} ", replicatedLogEntry);
+ // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+ journal.add(replicatedLogEntry);
+
+ // When persisting events with persist it is guaranteed that the
+ // persistent actor will not receive further commands between the
+ // persist call and the execution(s) of the associated event
+ // handler. This also holds for multiple persist calls in context
+ // of a single command.
+ persist(replicatedLogEntry,
+ new Procedure<ReplicatedLogEntry>() {
+ public void apply(ReplicatedLogEntry evt) throws Exception {
+ // when a snaphsot is being taken, captureSnapshot != null
+ if (hasSnapshotCaptureInitiated == false &&
+ journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+
+ LOG.info("Initiating Snapshot Capture..");
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+
+ ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ }
+
+ LOG.debug("Snapshot Capture logSize: {}", journal.size());
+ LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
+ LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+ LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+
+ // send a CaptureSnapshot to self to make the expensive operation async.
+ getSelf().tell(new CaptureSnapshot(
+ lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+ null);
+ hasSnapshotCaptureInitiated = true;
+ }
+ // Send message for replication
+ if (clientActor != null) {
+ currentBehavior.handleMessage(getSelf(),
+ new Replicate(clientActor, identifier,
+ replicatedLogEntry)
+ );
+ }
+ }
+ }
+ );
+ }
+
+ }
+
+ private static class DeleteEntries implements Serializable {
+ private final int fromIndex;
+