+ private void trimPersistentData(long sequenceNumber) {
+ // Trim akka snapshots
+ // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
+ // For now guessing that it is ANDed.
+ deleteSnapshots(new SnapshotSelectionCriteria(
+ sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+
+ // Trim akka journal
+ deleteMessages(sequenceNumber);
+ }
+
+
+ private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
+
+ public ReplicatedLogImpl(Snapshot snapshot) {
+ super(snapshot.getState(),
+ snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
+ snapshot.getUnAppliedEntries());
+ }
+
+ public ReplicatedLogImpl() {
+ super();
+ }
+
+ @Override public void removeFromAndPersist(long logEntryIndex) {
+ int adjustedIndex = adjustedIndex(logEntryIndex);
+
+ if (adjustedIndex < 0) {
+ return;
+ }
+
+ // FIXME: Maybe this should be done after the command is saved
+ journal.subList(adjustedIndex , journal.size()).clear();
+
+ persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+ @Override public void apply(DeleteEntries param)
+ throws Exception {
+ //FIXME : Doing nothing for now
+ }
+ });
+ }
+
+ @Override public void appendAndPersist(
+ final ReplicatedLogEntry replicatedLogEntry) {
+ appendAndPersist(null, null, replicatedLogEntry);
+ }
+
+ public void appendAndPersist(final ActorRef clientActor,
+ final String identifier,
+ final ReplicatedLogEntry replicatedLogEntry) {
+ context.getLogger().debug(
+ "Append log entry and persist {} ", replicatedLogEntry);
+ // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+ journal.add(replicatedLogEntry);
+
+ // When persisting events with persist it is guaranteed that the
+ // persistent actor will not receive further commands between the
+ // persist call and the execution(s) of the associated event
+ // handler. This also holds for multiple persist calls in context
+ // of a single command.
+ persist(replicatedLogEntry,
+ new Procedure<ReplicatedLogEntry>() {
+ public void apply(ReplicatedLogEntry evt) throws Exception {
+ // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
+ if (journal.size() > context.getConfigParams().getSnapshotBatchCount()) {
+ LOG.info("Initiating Snapshot Capture..");
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+
+ ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ }
+
+ LOG.debug("Snapshot Capture logSize: {}", journal.size());
+ LOG.debug("Snapshot Capture lastApplied:{} ", context.getLastApplied());
+ LOG.debug("Snapshot Capture lastAppliedIndex:{}", lastAppliedIndex);
+ LOG.debug("Snapshot Capture lastAppliedTerm:{}", lastAppliedTerm);
+
+ // create a snapshot object from the state provided and save it
+ // when snapshot is saved async, SaveSnapshotSuccess is raised.
+ Snapshot sn = Snapshot.create(createSnapshot(),
+ getFrom(context.getLastApplied() + 1),
+ lastIndex(), lastTerm(), lastAppliedIndex,
+ lastAppliedTerm);
+ saveSnapshot(sn);
+
+ LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
+
+ //be greedy and remove entries from in-mem journal which are in the snapshot
+ // and update snapshotIndex and snapshotTerm without waiting for the success,
+ // TODO: damage-recovery to be done on failure
+ journal.subList(0, (int) (lastAppliedIndex - snapshotIndex)).clear();
+ snapshotIndex = lastAppliedIndex;
+ snapshotTerm = lastAppliedTerm;
+
+ LOG.info("Removed in-memory snapshotted entries, " +
+ "adjusted snaphsotIndex:{}" +
+ "and term:{}", snapshotIndex, lastAppliedTerm);
+ }
+ // Send message for replication
+ if (clientActor != null) {
+ currentBehavior.handleMessage(getSelf(),
+ new Replicate(clientActor, identifier,
+ replicatedLogEntry)
+ );
+ }
+ }
+ }
+ );
+ }
+
+ }
+
+ private static class DeleteEntries implements Serializable {
+ private final int fromIndex;
+
+
+ public DeleteEntries(int fromIndex) {
+ this.fromIndex = fromIndex;
+ }
+
+ public int getFromIndex() {
+ return fromIndex;
+ }
+ }
+
+
+ private static class Snapshot implements Serializable {
+ private final Object state;
+ private final List<ReplicatedLogEntry> unAppliedEntries;
+ private final long lastIndex;
+ private final long lastTerm;
+ private final long lastAppliedIndex;
+ private final long lastAppliedTerm;
+
+ private Snapshot(Object state,
+ List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+ long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+ this.state = state;
+ this.unAppliedEntries = unAppliedEntries;
+ this.lastIndex = lastIndex;
+ this.lastTerm = lastTerm;
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ }
+
+
+ public static Snapshot create(Object state,
+ List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm) {
+ return new Snapshot(state, entries, lastIndex, lastTerm,
+ lastAppliedIndex, lastAppliedTerm);
+ }
+
+ public Object getState() {
+ return state;
+ }
+
+ public List<ReplicatedLogEntry> getUnAppliedEntries() {
+ return unAppliedEntries;
+ }
+
+ public long getLastTerm() {
+ return lastTerm;
+ }
+
+ public long getLastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ public long getLastAppliedTerm() {
+ return lastAppliedTerm;
+ }
+
+ public String getLogMessage() {
+ StringBuilder sb = new StringBuilder();
+ return sb.append("Snapshot={")
+ .append("lastTerm:" + this.getLastTerm() + ", ")
+ .append("LastAppliedIndex:" + this.getLastAppliedIndex() + ", ")
+ .append("LastAppliedTerm:" + this.getLastAppliedTerm() + ", ")
+ .append("UnAppliedEntries size:" + this.getUnAppliedEntries().size() + "}")
+ .toString();
+
+ }
+ }
+
+ private class ElectionTermImpl implements ElectionTerm {
+ /**
+ * Identifier of the actor whose election term information this is
+ */
+ private long currentTerm = 0;
+ private String votedFor = null;
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ @Override public void update(long currentTerm, String votedFor) {
+ LOG.info("Set currentTerm={}, votedFor={}", currentTerm, votedFor);