*/
package org.opendaylight.controller.cluster.raft;
+import com.google.protobuf.ByteString;
+
import java.util.ArrayList;
import java.util.List;
*/
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
- protected final List<ReplicatedLogEntry> journal;
- protected final Object snapshot;
+ protected List<ReplicatedLogEntry> journal;
+ protected ByteString snapshot;
protected long snapshotIndex = -1;
protected long snapshotTerm = -1;
- public AbstractReplicatedLogImpl(Object state, long snapshotIndex,
+ // to be used for rollback during save snapshot failure
+ protected List<ReplicatedLogEntry> snapshottedJournal;
+ protected ByteString previousSnapshot;
+ protected long previousSnapshotIndex = -1;
+ protected long previousSnapshotTerm = -1;
+
+ public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
this.snapshot = state;
this.snapshotIndex = snapshotIndex;
@Override
public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
+ return getFrom(logEntryIndex, journal.size());
+ }
+
+ @Override
+ public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
int adjustedIndex = adjustedIndex(logEntryIndex);
int size = journal.size();
List<ReplicatedLogEntry> entries = new ArrayList<>(100);
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
- entries.addAll(journal.subList(adjustedIndex, size));
+ int maxIndex = adjustedIndex + max;
+ if(maxIndex > size){
+ maxIndex = size;
+ }
+ entries.addAll(journal.subList(adjustedIndex, maxIndex));
}
return entries;
}
+
@Override
public long size() {
return journal.size();
@Override
public boolean isInSnapshot(long logEntryIndex) {
- return logEntryIndex <= snapshotIndex;
+ return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
}
@Override
- public Object getSnapshot() {
+ public ByteString getSnapshot() {
return snapshot;
}
@Override
public abstract void removeFromAndPersist(long index);
+
+ @Override
+ public void setSnapshotIndex(long snapshotIndex) {
+ this.snapshotIndex = snapshotIndex;
+ }
+
+ @Override
+ public void setSnapshotTerm(long snapshotTerm) {
+ this.snapshotTerm = snapshotTerm;
+ }
+
+ @Override
+ public void setSnapshot(ByteString snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public void clear(int startIndex, int endIndex) {
+ journal.subList(startIndex, endIndex).clear();
+ }
+
+ @Override
+ public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+ snapshottedJournal = new ArrayList<>(journal.size());
+
+ snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
+ clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+ previousSnapshotIndex = snapshotIndex;
+ setSnapshotIndex(snapshotCapturedIndex);
+
+ previousSnapshotTerm = snapshotTerm;
+ setSnapshotTerm(snapshotCapturedTerm);
+
+ previousSnapshot = getSnapshot();
+ setSnapshot(snapshot);
+ }
+
+ @Override
+ public void snapshotCommit() {
+ snapshottedJournal.clear();
+ snapshottedJournal = null;
+ previousSnapshotIndex = -1;
+ previousSnapshotTerm = -1;
+ previousSnapshot = null;
+ }
+
+ @Override
+ public void snapshotRollback() {
+ snapshottedJournal.addAll(journal);
+ journal.clear();
+ journal = snapshottedJournal;
+ snapshottedJournal = null;
+
+ snapshotIndex = previousSnapshotIndex;
+ previousSnapshotIndex = -1;
+
+ snapshotTerm = previousSnapshotTerm;
+ previousSnapshotTerm = -1;
+
+ snapshot = previousSnapshot;
+ previousSnapshot = null;
+
+ }
}