Force AbstractRaftRPC to use Externalizable proxy pattern
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / AbstractReplicatedLogImpl.java
index 653520c2e47db4be19de53c9e3a42099904298d8..86ba5ecc97aee9e7bbef8c103c08dcac6e5da36f 100644 (file)
@@ -7,40 +7,53 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract class handling the mapping of
  * logical LogEntry Index and the physical list index.
  */
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
+
+    private final String logContext;
 
     // We define this as ArrayList so we can use ensureCapacity.
-    protected ArrayList<ReplicatedLogEntry> journal;
+    private ArrayList<ReplicatedLogEntry> journal;
 
-    protected long snapshotIndex = -1;
-    protected long snapshotTerm = -1;
+    private long snapshotIndex = -1;
+    private long snapshotTerm = -1;
 
     // to be used for rollback during save snapshot failure
-    protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
-    protected long previousSnapshotIndex = -1;
-    protected long previousSnapshotTerm = -1;
-    protected int dataSize = 0;
+    private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+    private long previousSnapshotIndex = -1;
+    private long previousSnapshotTerm = -1;
+    private int dataSize = 0;
 
-    public AbstractReplicatedLogImpl(long snapshotIndex,
-        long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+    protected AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm,
+            List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
         this.snapshotIndex = snapshotIndex;
         this.snapshotTerm = snapshotTerm;
-        this.journal = new ArrayList<>(unAppliedEntries);
+        this.logContext = logContext;
+
+        this.journal = new ArrayList<>(unAppliedEntries.size());
+        for (ReplicatedLogEntry entry: unAppliedEntries) {
+            append(entry);
+        }
     }
 
-    public AbstractReplicatedLogImpl() {
-        this.journal = new ArrayList<>();
+    protected AbstractReplicatedLogImpl() {
+        this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
     }
 
     protected int adjustedIndex(long logEntryIndex) {
-        if(snapshotIndex < 0){
+        if (snapshotIndex < 0) {
             return (int) logEntryIndex;
         }
         return (int) (logEntryIndex - (snapshotIndex + 1));
@@ -88,18 +101,33 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     @Override
-    public void removeFrom(long logEntryIndex) {
+    public long removeFrom(long logEntryIndex) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
             // physical index should be less than list size and >= 0
-            return;
+            return -1;
         }
+
+        for (int i = adjustedIndex; i < journal.size(); i++) {
+            dataSize -= journal.get(i).size();
+        }
+
         journal.subList(adjustedIndex , journal.size()).clear();
+
+        return adjustedIndex;
     }
 
     @Override
-    public void append(ReplicatedLogEntry replicatedLogEntry) {
-        journal.add(replicatedLogEntry);
+    public boolean append(ReplicatedLogEntry replicatedLogEntry) {
+        if (replicatedLogEntry.getIndex() > lastIndex()) {
+            journal.add(replicatedLogEntry);
+            dataSize += replicatedLogEntry.size();
+            return true;
+        } else {
+            LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
+                    logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
+            return false;
+        }
     }
 
     @Override
@@ -109,29 +137,60 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
-        return getFrom(logEntryIndex, journal.size());
+        return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
     }
 
     @Override
-    public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
+    public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         int size = journal.size();
-        List<ReplicatedLogEntry> entries = new ArrayList<>(100);
         if (adjustedIndex >= 0 && adjustedIndex < size) {
             // physical index should be less than list size and >= 0
-            int maxIndex = adjustedIndex + max;
-            if(maxIndex > size){
+            int maxIndex = adjustedIndex + maxEntries;
+            if (maxIndex > size) {
                 maxIndex = size;
             }
-            entries.addAll(journal.subList(adjustedIndex, maxIndex));
+
+            if (maxDataSize == NO_MAX_SIZE) {
+                return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+            } else {
+                return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
+            }
+        } else {
+            return Collections.emptyList();
         }
-        return entries;
     }
 
+    private List<ReplicatedLogEntry> copyJournalEntries(int fromIndex, int toIndex, long maxDataSize) {
+        List<ReplicatedLogEntry> retList = new ArrayList<>(toIndex - fromIndex);
+        long totalSize = 0;
+        for (int i = fromIndex; i < toIndex; i++) {
+            ReplicatedLogEntry entry = journal.get(i);
+            totalSize += entry.size();
+            if (totalSize <= maxDataSize) {
+                retList.add(entry);
+            } else {
+                if (retList.isEmpty()) {
+                    // Edge case - the first entry's size exceeds the threshold. We need to return
+                    // at least the first entry so add it here.
+                    retList.add(entry);
+                }
+
+                break;
+            }
+        }
+
+        return retList;
+    }
 
     @Override
     public long size() {
-       return journal.size();
+        return journal.size();
+    }
+
+    @Override
+    public int dataSize() {
+        return dataSize;
     }
 
     @Override
@@ -141,12 +200,12 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
             return false;
         }
         int adjustedIndex = adjustedIndex(logEntryIndex);
-        return (adjustedIndex >= 0);
+        return adjustedIndex >= 0;
     }
 
     @Override
     public boolean isInSnapshot(long logEntryIndex) {
-        return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
+        return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
     }
 
     @Override
@@ -159,12 +218,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         return snapshotTerm;
     }
 
-    @Override
-    public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
-
-    @Override
-    public abstract void removeFromAndPersist(long index);
-
     @Override
     public void setSnapshotIndex(long snapshotIndex) {
         this.snapshotIndex = snapshotIndex;
@@ -182,10 +235,16 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     @Override
     public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
+        Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
+                "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
+
         snapshottedJournal = new ArrayList<>(journal.size());
 
-        snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
-        clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+        List<ReplicatedLogEntry> snapshotJournalEntries =
+                journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+        snapshottedJournal.addAll(snapshotJournalEntries);
+        snapshotJournalEntries.clear();
 
         previousSnapshotIndex = snapshotIndex;
         setSnapshotIndex(snapshotCapturedIndex);
@@ -200,6 +259,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
         dataSize = 0;
+        // need to recalc the datasize based on the entries left after precommit.
+        for (ReplicatedLogEntry logEntry : journal) {
+            dataSize += logEntry.size();
+        }
+
     }
 
     @Override
@@ -214,4 +278,9 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         snapshotTerm = previousSnapshotTerm;
         previousSnapshotTerm = -1;
     }
+
+    @VisibleForTesting
+    ReplicatedLogEntry getAtPhysicalIndex(int index) {
+        return journal.get(index);
+    }
 }