*/
package org.opendaylight.controller.cluster.raft;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
*/
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
- protected final List<ReplicatedLogEntry> journal;
- protected final Object snapshot;
- protected long snapshotIndex = -1;
- protected long snapshotTerm = -1;
+ // We define this as ArrayList so we can use ensureCapacity.
+ protected ArrayList<ReplicatedLogEntry> journal;
- public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+ private long snapshotIndex = -1;
+ private long snapshotTerm = -1;
+
+ // to be used for rollback during save snapshot failure
+ private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+ private long previousSnapshotIndex = -1;
+ private long previousSnapshotTerm = -1;
+ protected int dataSize = 0;
+
+ public AbstractReplicatedLogImpl(long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
- this.snapshot = state;
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
this.journal = new ArrayList<>(unAppliedEntries);
}
-
public AbstractReplicatedLogImpl() {
- this.snapshot = null;
- this.journal = new ArrayList<>();
+ this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
}
protected int adjustedIndex(long logEntryIndex) {
- if(snapshotIndex < 0){
+ if (snapshotIndex < 0) {
return (int) logEntryIndex;
}
return (int) (logEntryIndex - (snapshotIndex + 1));
journal.add(replicatedLogEntry);
}
+ @Override
+ public void increaseJournalLogCapacity(int amount) {
+ journal.ensureCapacity(journal.size() + amount);
+ }
+
@Override
public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
return getFrom(logEntryIndex, journal.size());
public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
int adjustedIndex = adjustedIndex(logEntryIndex);
int size = journal.size();
- List<ReplicatedLogEntry> entries = new ArrayList<>(100);
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
int maxIndex = adjustedIndex + max;
if(maxIndex > size){
maxIndex = size;
}
- entries.addAll(journal.subList(adjustedIndex, maxIndex));
+ return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+ } else {
+ return Collections.emptyList();
}
- return entries;
}
-
@Override
public long size() {
return journal.size();
}
+ @Override
+ public int dataSize() {
+ return dataSize;
+ }
+
@Override
public boolean isPresent(long logEntryIndex) {
if (logEntryIndex > lastIndex()) {
@Override
public boolean isInSnapshot(long logEntryIndex) {
- return logEntryIndex <= snapshotIndex;
- }
-
- @Override
- public Object getSnapshot() {
- return snapshot;
+ return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
}
@Override
@Override
public abstract void removeFromAndPersist(long index);
+
+ @Override
+ public void setSnapshotIndex(long snapshotIndex) {
+ this.snapshotIndex = snapshotIndex;
+ }
+
+ @Override
+ public void setSnapshotTerm(long snapshotTerm) {
+ this.snapshotTerm = snapshotTerm;
+ }
+
+ @Override
+ public void clear(int startIndex, int endIndex) {
+ journal.subList(startIndex, endIndex).clear();
+ }
+
+ @Override
+ public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) {
+ Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
+ "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
+
+ snapshottedJournal = new ArrayList<>(journal.size());
+
+ List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+ snapshottedJournal.addAll(snapshotJournalEntries);
+ clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+ previousSnapshotIndex = snapshotIndex;
+ setSnapshotIndex(snapshotCapturedIndex);
+
+ previousSnapshotTerm = snapshotTerm;
+ setSnapshotTerm(snapshotCapturedTerm);
+ }
+
+ @Override
+ public void snapshotCommit() {
+ snapshottedJournal = null;
+ previousSnapshotIndex = -1;
+ previousSnapshotTerm = -1;
+ dataSize = 0;
+ // need to recalc the datasize based on the entries left after precommit.
+ for(ReplicatedLogEntry logEntry : journal) {
+ dataSize += logEntry.size();
+ }
+
+ }
+
+ @Override
+ public void snapshotRollback() {
+ snapshottedJournal.addAll(journal);
+ journal = snapshottedJournal;
+ snapshottedJournal = null;
+
+ snapshotIndex = previousSnapshotIndex;
+ previousSnapshotIndex = -1;
+
+ snapshotTerm = previousSnapshotTerm;
+ previousSnapshotTerm = -1;
+ }
}