X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FAbstractReplicatedLogImpl.java;h=64506ee6867fedd656b190f420be7ac7ec44c9b9;hp=c27ff373865069df16b08eae829e1722ff74a5f8;hb=ff29db5dc6012f77bbe53f57ddce929b0de093b3;hpb=68fb550b416dddd0a50e0110add0a4ae9b706758 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index c27ff37386..64506ee686 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -12,12 +12,18 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.eclipse.jdt.annotation.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Abstract class handling the mapping of * logical LogEntry Index and the physical list index. */ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { + private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class); + + private final String logContext; // We define this as ArrayList so we can use ensureCapacity. private ArrayList journal; @@ -31,22 +37,23 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { private long previousSnapshotTerm = -1; private int dataSize = 0; - public AbstractReplicatedLogImpl(long snapshotIndex, - long snapshotTerm, List unAppliedEntries) { + protected AbstractReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm, + final List unAppliedEntries, final String logContext) { this.snapshotIndex = snapshotIndex; this.snapshotTerm = snapshotTerm; - this.journal = new ArrayList<>(unAppliedEntries); + this.logContext = logContext; - for(ReplicatedLogEntry entry: journal) { - dataSize += entry.size(); + this.journal = new ArrayList<>(unAppliedEntries.size()); + for (ReplicatedLogEntry entry: unAppliedEntries) { + append(entry); } } - public AbstractReplicatedLogImpl() { - this(-1L, -1L, Collections.emptyList()); + protected AbstractReplicatedLogImpl() { + this(-1L, -1L, Collections.emptyList(), ""); } - protected int adjustedIndex(long logEntryIndex) { + protected int adjustedIndex(final long logEntryIndex) { if (snapshotIndex < 0) { return (int) logEntryIndex; } @@ -54,7 +61,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public ReplicatedLogEntry get(long logEntryIndex) { + public ReplicatedLogEntry get(final long logEntryIndex) { int adjustedIndex = adjustedIndex(logEntryIndex); if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { @@ -95,14 +102,14 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public long removeFrom(long logEntryIndex) { + public long removeFrom(final long logEntryIndex) { int adjustedIndex = adjustedIndex(logEntryIndex); if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { // physical index should be less than list size and >= 0 return -1; } - for(int i = adjustedIndex; i < journal.size(); i++) { + for (int i = adjustedIndex; i < journal.size(); i++) { dataSize -= journal.get(i).size(); } @@ -112,40 +119,75 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public void append(ReplicatedLogEntry replicatedLogEntry) { - journal.add(replicatedLogEntry); - dataSize += replicatedLogEntry.size(); + public boolean append(final ReplicatedLogEntry replicatedLogEntry) { + if (replicatedLogEntry.getIndex() > lastIndex()) { + journal.add(replicatedLogEntry); + dataSize += replicatedLogEntry.size(); + return true; + } else { + LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}", + logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace")); + return false; + } } @Override - public void increaseJournalLogCapacity(int amount) { + public void increaseJournalLogCapacity(final int amount) { journal.ensureCapacity(journal.size() + amount); } @Override - public List getFrom(long logEntryIndex) { - return getFrom(logEntryIndex, journal.size()); + public List getFrom(final long logEntryIndex) { + return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE); } @Override - public List getFrom(long logEntryIndex, int max) { + public List getFrom(final long logEntryIndex, final int maxEntries, final long maxDataSize) { int adjustedIndex = adjustedIndex(logEntryIndex); int size = journal.size(); if (adjustedIndex >= 0 && adjustedIndex < size) { // physical index should be less than list size and >= 0 - int maxIndex = adjustedIndex + max; - if(maxIndex > size){ + int maxIndex = adjustedIndex + maxEntries; + if (maxIndex > size) { maxIndex = size; } - return new ArrayList<>(journal.subList(adjustedIndex, maxIndex)); + + if (maxDataSize == NO_MAX_SIZE) { + return new ArrayList<>(journal.subList(adjustedIndex, maxIndex)); + } else { + return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize); + } } else { return Collections.emptyList(); } } + private @NonNull List copyJournalEntries(final int fromIndex, final int toIndex, + final long maxDataSize) { + List retList = new ArrayList<>(toIndex - fromIndex); + long totalSize = 0; + for (int i = fromIndex; i < toIndex; i++) { + ReplicatedLogEntry entry = journal.get(i); + totalSize += entry.size(); + if (totalSize <= maxDataSize) { + retList.add(entry); + } else { + if (retList.isEmpty()) { + // Edge case - the first entry's size exceeds the threshold. We need to return + // at least the first entry so add it here. + retList.add(entry); + } + + break; + } + } + + return retList; + } + @Override public long size() { - return journal.size(); + return journal.size(); } @Override @@ -154,18 +196,18 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public boolean isPresent(long logEntryIndex) { + public boolean isPresent(final long logEntryIndex) { if (logEntryIndex > lastIndex()) { // if the request logical index is less than the last present in the list return false; } int adjustedIndex = adjustedIndex(logEntryIndex); - return (adjustedIndex >= 0); + return adjustedIndex >= 0; } @Override - public boolean isInSnapshot(long logEntryIndex) { - return logEntryIndex <= snapshotIndex && snapshotIndex != -1; + public boolean isInSnapshot(final long logEntryIndex) { + return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1; } @Override @@ -179,37 +221,32 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry); - - @Override - public abstract void removeFromAndPersist(long index); - - @Override - public void setSnapshotIndex(long snapshotIndex) { + public void setSnapshotIndex(final long snapshotIndex) { this.snapshotIndex = snapshotIndex; } @Override - public void setSnapshotTerm(long snapshotTerm) { + public void setSnapshotTerm(final long snapshotTerm) { this.snapshotTerm = snapshotTerm; } @Override - public void clear(int startIndex, int endIndex) { + public void clear(final int startIndex, final int endIndex) { journal.subList(startIndex, endIndex).clear(); } @Override - public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) { + public void snapshotPreCommit(final long snapshotCapturedIndex, final long snapshotCapturedTerm) { Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex, "snapshotCapturedIndex must be greater than or equal to snapshotIndex"); snapshottedJournal = new ArrayList<>(journal.size()); - List snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex)); + List snapshotJournalEntries = + journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex)); snapshottedJournal.addAll(snapshotJournalEntries); - clear(0, (int) (snapshotCapturedIndex - snapshotIndex)); + snapshotJournalEntries.clear(); previousSnapshotIndex = snapshotIndex; setSnapshotIndex(snapshotCapturedIndex); @@ -219,16 +256,20 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public void snapshotCommit() { + public void snapshotCommit(final boolean updateDataSize) { snapshottedJournal = null; previousSnapshotIndex = -1; previousSnapshotTerm = -1; - dataSize = 0; - // need to recalc the datasize based on the entries left after precommit. - for(ReplicatedLogEntry logEntry : journal) { - dataSize += logEntry.size(); - } + if (updateDataSize) { + // need to recalc the datasize based on the entries left after precommit. + int newDataSize = 0; + for (ReplicatedLogEntry logEntry : journal) { + newDataSize += logEntry.size(); + } + LOG.trace("{}: Updated dataSize from {} to {}", logContext, dataSize, newDataSize); + dataSize = newDataSize; + } } @Override @@ -245,7 +286,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @VisibleForTesting - ReplicatedLogEntry getAtPhysicalIndex(int index) { + ReplicatedLogEntry getAtPhysicalIndex(final int index) { return journal.get(index); } }