+ if (journal.size() == 0) {
+ return null;
+ }
+ return get(journal.size() - 1);
+ }
+
+ @Override public long lastIndex() {
+ if (journal.size() == 0) {
+ return -1;
+ }
+
+ return last().getIndex();
+ }
+
+ @Override public long lastTerm() {
+ if (journal.size() == 0) {
+ return -1;
+ }
+
+ return last().getTerm();
+ }
+
+
+ @Override public void removeFrom(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return;
+ }
+
+ journal.subList(adjustedIndex , journal.size()).clear();
+ }
+
+
+ @Override public void removeFromAndPersist(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return;
+ }
+
+ // FIXME: Maybe this should be done after the command is saved
+ journal.subList(adjustedIndex , journal.size()).clear();
+
+ persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+
+ @Override public void apply(DeleteEntries param)
+ throws Exception {
+ //FIXME : Doing nothing for now
+ }
+ });
+
+
+ }
+
+ @Override public void append(
+ final ReplicatedLogEntry replicatedLogEntry) {
+ journal.add(replicatedLogEntry);
+ }
+
+ @Override public List<ReplicatedLogEntry> getFrom(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ List<ReplicatedLogEntry> entries = new ArrayList<>(100);
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return entries;
+ }
+
+
+ for (int i = adjustedIndex;
+ i < journal.size(); i++) {
+ entries.add(journal.get(i));
+ }
+ return entries;
+ }
+
+ @Override public void appendAndPersist(
+ final ReplicatedLogEntry replicatedLogEntry) {
+ appendAndPersist(null, null, replicatedLogEntry);
+ }
+
+ public void appendAndPersist(final ActorRef clientActor,
+ final String identifier,
+ final ReplicatedLogEntry replicatedLogEntry) {
+ context.getLogger().debug(
+ "Append log entry and persist {} ", replicatedLogEntry);
+ // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
+ journal.add(replicatedLogEntry);
+
+ // When persisting events with persist it is guaranteed that the
+ // persistent actor will not receive further commands between the
+ // persist call and the execution(s) of the associated event
+ // handler. This also holds for multiple persist calls in context
+ // of a single command.
+ persist(replicatedLogEntry,
+ new Procedure<ReplicatedLogEntry>() {
+ public void apply(ReplicatedLogEntry evt) throws Exception {
+ // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
+ if (size() > 100000) {
+ ReplicatedLogEntry lastAppliedEntry =
+ get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ }
+
+ saveSnapshot(Snapshot.create(createSnapshot(),
+ getFrom(context.getLastApplied() + 1),
+ lastIndex(), lastTerm(), lastAppliedIndex,
+ lastAppliedTerm));
+ }
+ // Send message for replication
+ if (clientActor != null) {
+ currentBehavior.handleMessage(getSelf(),
+ new Replicate(clientActor, identifier,
+ replicatedLogEntry)
+ );
+ }
+ }
+ }
+ );
+ }
+
+ @Override public long size() {
+ return journal.size() + snapshotIndex + 1;
+ }
+
+ @Override public boolean isPresent(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override public boolean isInSnapshot(long index) {
+ return index <= snapshotIndex;
+ }
+
+ @Override public Object getSnapshot() {
+ return snapshot;
+ }
+
+ @Override public long getSnapshotIndex() {
+ return snapshotIndex;
+ }
+
+ @Override public long getSnapshotTerm() {
+ return snapshotTerm;
+ }
+
+ private int adjustedIndex(long index) {
+ if(snapshotIndex < 0){
+ return (int) index;
+ }
+ return (int) (index - snapshotIndex);