+
+ @Override public void removeFromAndPersist(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return;
+ }
+
+ // FIXME: Maybe this should be done after the command is saved
+ journal.subList(adjustedIndex , journal.size()).clear();
+
+ persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+ @Override public void apply(DeleteEntries param)
+ throws Exception {
+ //FIXME : Doing nothing for now
+ }
+ });
+
+
+ }
+
+ @Override public void append(
+ final ReplicatedLogEntry replicatedLogEntry) {
+ journal.add(replicatedLogEntry);
+ }
+
+ @Override public List<ReplicatedLogEntry> getFrom(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ List<ReplicatedLogEntry> entries = new ArrayList<>(100);
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return entries;
+ }
+
+
+ for (int i = adjustedIndex;
+ i < journal.size(); i++) {
+ entries.add(journal.get(i));
+ }
+ return entries;
+ }
+
+ @Override public void appendAndPersist(
+ final ReplicatedLogEntry replicatedLogEntry) {
+ appendAndPersist(null, null, replicatedLogEntry);
+ }
+
+ public void appendAndPersist(final ActorRef clientActor,
+ final String identifier,
+ final ReplicatedLogEntry replicatedLogEntry) {
+ context.getLogger().debug(
+ "Append log entry and persist " + replicatedLogEntry);
+ // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+ journal.add(replicatedLogEntry);
+
+ // When persisting events with persist it is guaranteed that the
+ // persistent actor will not receive further commands between the
+ // persist call and the execution(s) of the associated event
+ // handler. This also holds for multiple persist calls in context
+ // of a single command.
+ persist(replicatedLogEntry,
+ new Procedure<ReplicatedLogEntry>() {
+ public void apply(ReplicatedLogEntry evt) throws Exception {
+ // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
+ if (size() > 100000) {
+ ReplicatedLogEntry lastAppliedEntry =
+ get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ }
+
+ saveSnapshot(Snapshot.create(createSnapshot(),
+ getFrom(context.getLastApplied() + 1),
+ lastIndex(), lastTerm(), lastAppliedIndex,
+ lastAppliedTerm));
+ }
+ // Send message for replication
+ if (clientActor != null) {
+ currentBehavior.handleMessage(getSelf(),
+ new Replicate(clientActor, identifier,
+ replicatedLogEntry)
+ );
+ }
+ }
+ }
+ );
+ }
+
+ @Override public long size() {
+ return journal.size() + snapshotIndex + 1;
+ }
+
+ @Override public boolean isPresent(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override public boolean isInSnapshot(long index) {
+ return index <= snapshotIndex;
+ }
+
+ @Override public Object getSnapshot() {
+ return snapshot;
+ }
+
+ @Override public long getSnapshotIndex() {
+ return snapshotIndex;
+ }
+
+ @Override public long getSnapshotTerm() {
+ return snapshotTerm;
+ }
+
+ private int adjustedIndex(long index) {
+ if(snapshotIndex < 0){
+ return (int) index;
+ }
+ return (int) (index - snapshotIndex);
+ }
+ }
+
+
+ private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
+ Serializable {
+
+ private final long index;
+ private final long term;
+ private final Object payload;
+
+ public ReplicatedLogImplEntry(long index, long term, Object payload) {
+
+ this.index = index;
+ this.term = term;
+ this.payload = payload;
+ }
+
+ @Override public Object getData() {
+ return payload;
+ }
+
+ @Override public long getTerm() {
+ return term;
+ }
+
+ @Override public long getIndex() {
+ return index;
+ }
+
+ @Override public String toString() {
+ return "Entry{" +
+ "index=" + index +
+ ", term=" + term +
+ '}';
+ }
+ }
+
+ private static class DeleteEntries implements Serializable {
+ private final int fromIndex;
+
+
+ public DeleteEntries(int fromIndex) {
+ this.fromIndex = fromIndex;
+ }
+
+ public int getFromIndex() {
+ return fromIndex;
+ }
+ }
+
+
+ private static class Snapshot implements Serializable {
+ private final Object state;
+ private final List<ReplicatedLogEntry> unAppliedEntries;
+ private final long lastIndex;
+ private final long lastTerm;
+ private final long lastAppliedIndex;
+ private final long lastAppliedTerm;
+
+ private Snapshot(Object state,
+ List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+ long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+ this.state = state;
+ this.unAppliedEntries = unAppliedEntries;
+ this.lastIndex = lastIndex;
+ this.lastTerm = lastTerm;
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ }
+
+
+ public static Snapshot create(Object state,
+ List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm) {
+ return new Snapshot(state, entries, lastIndex, lastTerm,
+ lastAppliedIndex, lastAppliedTerm);
+ }
+
+ public Object getState() {
+ return state;
+ }
+
+ public List<ReplicatedLogEntry> getUnAppliedEntries() {
+ return unAppliedEntries;
+ }
+
+ public long getLastTerm() {
+ return lastTerm;
+ }
+
+ public long getLastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ public long getLastAppliedTerm() {
+ return lastAppliedTerm;