*/
package org.opendaylight.controller.cluster.raft;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
// We define this as ArrayList so we can use ensureCapacity.
- protected ArrayList<ReplicatedLogEntry> journal;
+ private ArrayList<ReplicatedLogEntry> journal;
private long snapshotIndex = -1;
private long snapshotTerm = -1;
private ArrayList<ReplicatedLogEntry> snapshottedJournal;
private long previousSnapshotIndex = -1;
private long previousSnapshotTerm = -1;
- protected int dataSize = 0;
+ private int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
this.journal = new ArrayList<>(unAppliedEntries);
+
+ for(ReplicatedLogEntry entry: journal) {
+ dataSize += entry.size();
+ }
}
public AbstractReplicatedLogImpl() {
}
protected int adjustedIndex(long logEntryIndex) {
- if(snapshotIndex < 0){
+ if (snapshotIndex < 0) {
return (int) logEntryIndex;
}
return (int) (logEntryIndex - (snapshotIndex + 1));
}
@Override
- public void removeFrom(long logEntryIndex) {
+ public long removeFrom(long logEntryIndex) {
int adjustedIndex = adjustedIndex(logEntryIndex);
if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
// physical index should be less than list size and >= 0
- return;
+ return -1;
+ }
+
+ for(int i = adjustedIndex; i < journal.size(); i++) {
+ dataSize -= journal.get(i).size();
}
+
journal.subList(adjustedIndex , journal.size()).clear();
+
+ return adjustedIndex;
}
@Override
public void append(ReplicatedLogEntry replicatedLogEntry) {
journal.add(replicatedLogEntry);
+ dataSize += replicatedLogEntry.size();
}
@Override
return journal.size();
}
+ @Override
+ public int dataSize() {
+ return dataSize;
+ }
+
@Override
public boolean isPresent(long logEntryIndex) {
if (logEntryIndex > lastIndex()) {
@Override
public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
+ Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
+ "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
+
snapshottedJournal = new ArrayList<>(journal.size());
- snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
- clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+ List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+ snapshottedJournal.addAll(snapshotJournalEntries);
+ snapshotJournalEntries.clear();
previousSnapshotIndex = snapshotIndex;
setSnapshotIndex(snapshotCapturedIndex);
previousSnapshotIndex = -1;
previousSnapshotTerm = -1;
dataSize = 0;
+ // need to recalc the datasize based on the entries left after precommit.
+ for(ReplicatedLogEntry logEntry : journal) {
+ dataSize += logEntry.size();
+ }
+
}
@Override
snapshotTerm = previousSnapshotTerm;
previousSnapshotTerm = -1;
}
+
+ @VisibleForTesting
+ ReplicatedLogEntry getAtPhysicalIndex(int index) {
+ return journal.get(index);
+ }
}