X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FAbstractReplicatedLogImpl.java;h=c27ff373865069df16b08eae829e1722ff74a5f8;hp=24bfa3de21f0d6d8af306fda1e73a8f4638af160;hb=68fb550b416dddd0a50e0110add0a4ae9b706758;hpb=fcb707ea1dadb4775c80327adef7edac67ff9db0 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index 24bfa3de21..c27ff37386 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -7,7 +7,10 @@ */ package org.opendaylight.controller.cluster.raft; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -16,27 +19,35 @@ import java.util.List; */ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { - protected final List journal; - protected final Object snapshot; - protected long snapshotIndex = -1; - protected long snapshotTerm = -1; + // We define this as ArrayList so we can use ensureCapacity. + private ArrayList journal; - public AbstractReplicatedLogImpl(Object state, long snapshotIndex, + private long snapshotIndex = -1; + private long snapshotTerm = -1; + + // to be used for rollback during save snapshot failure + private ArrayList snapshottedJournal; + private long previousSnapshotIndex = -1; + private long previousSnapshotTerm = -1; + private int dataSize = 0; + + public AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm, List unAppliedEntries) { - this.snapshot = state; this.snapshotIndex = snapshotIndex; this.snapshotTerm = snapshotTerm; this.journal = new ArrayList<>(unAppliedEntries); - } + for(ReplicatedLogEntry entry: journal) { + dataSize += entry.size(); + } + } public AbstractReplicatedLogImpl() { - this.snapshot = null; - this.journal = new ArrayList<>(); + this(-1L, -1L, Collections.emptyList()); } protected int adjustedIndex(long logEntryIndex) { - if(snapshotIndex < 0){ + if (snapshotIndex < 0) { return (int) logEntryIndex; } return (int) (logEntryIndex - (snapshotIndex + 1)); @@ -84,30 +95,52 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public void removeFrom(long logEntryIndex) { + public long removeFrom(long logEntryIndex) { int adjustedIndex = adjustedIndex(logEntryIndex); if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { // physical index should be less than list size and >= 0 - return; + return -1; + } + + for(int i = adjustedIndex; i < journal.size(); i++) { + dataSize -= journal.get(i).size(); } + journal.subList(adjustedIndex , journal.size()).clear(); + + return adjustedIndex; } @Override public void append(ReplicatedLogEntry replicatedLogEntry) { journal.add(replicatedLogEntry); + dataSize += replicatedLogEntry.size(); + } + + @Override + public void increaseJournalLogCapacity(int amount) { + journal.ensureCapacity(journal.size() + amount); } @Override public List getFrom(long logEntryIndex) { + return getFrom(logEntryIndex, journal.size()); + } + + @Override + public List getFrom(long logEntryIndex, int max) { int adjustedIndex = adjustedIndex(logEntryIndex); int size = journal.size(); - List entries = new ArrayList<>(100); if (adjustedIndex >= 0 && adjustedIndex < size) { // physical index should be less than list size and >= 0 - entries.addAll(journal.subList(adjustedIndex, size)); + int maxIndex = adjustedIndex + max; + if(maxIndex > size){ + maxIndex = size; + } + return new ArrayList<>(journal.subList(adjustedIndex, maxIndex)); + } else { + return Collections.emptyList(); } - return entries; } @Override @@ -115,6 +148,11 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { return journal.size(); } + @Override + public int dataSize() { + return dataSize; + } + @Override public boolean isPresent(long logEntryIndex) { if (logEntryIndex > lastIndex()) { @@ -127,12 +165,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public boolean isInSnapshot(long logEntryIndex) { - return logEntryIndex <= snapshotIndex; - } - - @Override - public Object getSnapshot() { - return snapshot; + return logEntryIndex <= snapshotIndex && snapshotIndex != -1; } @Override @@ -150,4 +183,69 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public abstract void removeFromAndPersist(long index); + + @Override + public void setSnapshotIndex(long snapshotIndex) { + this.snapshotIndex = snapshotIndex; + } + + @Override + public void setSnapshotTerm(long snapshotTerm) { + this.snapshotTerm = snapshotTerm; + } + + @Override + public void clear(int startIndex, int endIndex) { + journal.subList(startIndex, endIndex).clear(); + } + + @Override + public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) { + Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex, + "snapshotCapturedIndex must be greater than or equal to snapshotIndex"); + + snapshottedJournal = new ArrayList<>(journal.size()); + + List snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex)); + + snapshottedJournal.addAll(snapshotJournalEntries); + clear(0, (int) (snapshotCapturedIndex - snapshotIndex)); + + previousSnapshotIndex = snapshotIndex; + setSnapshotIndex(snapshotCapturedIndex); + + previousSnapshotTerm = snapshotTerm; + setSnapshotTerm(snapshotCapturedTerm); + } + + @Override + public void snapshotCommit() { + snapshottedJournal = null; + previousSnapshotIndex = -1; + previousSnapshotTerm = -1; + dataSize = 0; + // need to recalc the datasize based on the entries left after precommit. + for(ReplicatedLogEntry logEntry : journal) { + dataSize += logEntry.size(); + } + + } + + @Override + public void snapshotRollback() { + snapshottedJournal.addAll(journal); + journal = snapshottedJournal; + snapshottedJournal = null; + + snapshotIndex = previousSnapshotIndex; + previousSnapshotIndex = -1; + + snapshotTerm = previousSnapshotTerm; + previousSnapshotTerm = -1; + } + + @VisibleForTesting + ReplicatedLogEntry getAtPhysicalIndex(int index) { + return journal.get(index); + } }