BUG 2437 - Enable snapshotting based on size of data in the in-memory journal
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
index b5b034afb9cf8edc7635cfca5509c93cbeb457b5..653520c2e47db4be19de53c9e3a42099904298d8 100644 (file)
@@ -16,22 +16,26 @@ import java.util.List;
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
-    protected final List<ReplicatedLogEntry> journal;
-    protected final Object snapshot;
+    // We define this as ArrayList so we can use ensureCapacity.
+    protected ArrayList<ReplicatedLogEntry> journal;
+
     protected long snapshotIndex = -1;
     protected long snapshotTerm = -1;
 
-    public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+    // to be used for rollback during save snapshot failure
+    protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
+    protected long previousSnapshotIndex = -1;
+    protected long previousSnapshotTerm = -1;
+    protected int dataSize = 0;
+
+    public AbstractReplicatedLogImpl(long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
-        this.snapshot = state;
         this.snapshotIndex = snapshotIndex;
         this.snapshotTerm = snapshotTerm;
         this.journal = new ArrayList<>(unAppliedEntries);
     }
 
-
     public AbstractReplicatedLogImpl() {
-        this.snapshot = null;
         this.journal = new ArrayList<>();
     }
 
@@ -98,6 +102,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         journal.add(replicatedLogEntry);
     }
 
+    @Override
+    public void increaseJournalLogCapacity(int amount) {
+        journal.ensureCapacity(journal.size() + amount);
+    }
+
     @Override
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
         return getFrom(logEntryIndex, journal.size());
@@ -137,12 +146,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public boolean isInSnapshot(long logEntryIndex) {
-        return logEntryIndex <= snapshotIndex;
-    }
-
-    @Override
-    public Object getSnapshot() {
-        return snapshot;
+        return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
     @Override
@@ -160,4 +164,54 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public abstract void removeFromAndPersist(long index);
+
+    @Override
+    public void setSnapshotIndex(long snapshotIndex) {
+        this.snapshotIndex = snapshotIndex;
+    }
+
+    @Override
+    public void setSnapshotTerm(long snapshotTerm) {
+        this.snapshotTerm = snapshotTerm;
+    }
+
+    @Override
+    public void clear(int startIndex, int endIndex) {
+        journal.subList(startIndex, endIndex).clear();
+    }
+
+    @Override
+    public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
+        snapshottedJournal = new ArrayList<>(journal.size());
+
+        snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+        clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+        previousSnapshotIndex = snapshotIndex;
+        setSnapshotIndex(snapshotCapturedIndex);
+
+        previousSnapshotTerm = snapshotTerm;
+        setSnapshotTerm(snapshotCapturedTerm);
+    }
+
+    @Override
+    public void snapshotCommit() {
+        snapshottedJournal = null;
+        previousSnapshotIndex = -1;
+        previousSnapshotTerm = -1;
+        dataSize = 0;
+    }
+
+    @Override
+    public void snapshotRollback() {
+        snapshottedJournal.addAll(journal);
+        journal = snapshottedJournal;
+        snapshottedJournal = null;
+
+        snapshotIndex = previousSnapshotIndex;
+        previousSnapshotIndex = -1;
+
+        snapshotTerm = previousSnapshotTerm;
+        previousSnapshotTerm = -1;
+    }
 }