X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FAbstractReplicatedLogImpl.java;h=86ba5ecc97aee9e7bbef8c103c08dcac6e5da36f;hp=b436bce50061f98d0362ce3df4336c4279977d63;hb=4a3ba6c6695119ba041f358fca281b582c7665f1;hpb=43fbc0b14b21dc32ed8a14128453dd1581920f5a diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index b436bce500..86ba5ecc97 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -7,44 +7,53 @@ */ package org.opendaylight.controller.cluster.raft; -import com.google.protobuf.ByteString; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Abstract class handling the mapping of * logical LogEntry Index and the physical list index. */ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { + private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class); + + private final String logContext; + + // We define this as ArrayList so we can use ensureCapacity. + private ArrayList journal; - protected List journal; - protected ByteString snapshot; - protected long snapshotIndex = -1; - protected long snapshotTerm = -1; + private long snapshotIndex = -1; + private long snapshotTerm = -1; // to be used for rollback during save snapshot failure - protected List snapshottedJournal; - protected ByteString previousSnapshot; - protected long previousSnapshotIndex = -1; - protected long previousSnapshotTerm = -1; - - public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex, - long snapshotTerm, List unAppliedEntries) { - this.snapshot = state; + private ArrayList snapshottedJournal; + private long previousSnapshotIndex = -1; + private long previousSnapshotTerm = -1; + private int dataSize = 0; + + protected AbstractReplicatedLogImpl(long snapshotIndex, long snapshotTerm, + List unAppliedEntries, String logContext) { this.snapshotIndex = snapshotIndex; this.snapshotTerm = snapshotTerm; - this.journal = new ArrayList<>(unAppliedEntries); - } + this.logContext = logContext; + this.journal = new ArrayList<>(unAppliedEntries.size()); + for (ReplicatedLogEntry entry: unAppliedEntries) { + append(entry); + } + } - public AbstractReplicatedLogImpl() { - this.snapshot = null; - this.journal = new ArrayList<>(); + protected AbstractReplicatedLogImpl() { + this(-1L, -1L, Collections.emptyList(), ""); } protected int adjustedIndex(long logEntryIndex) { - if(snapshotIndex < 0){ + if (snapshotIndex < 0) { return (int) logEntryIndex; } return (int) (logEntryIndex - (snapshotIndex + 1)); @@ -92,45 +101,96 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public void removeFrom(long logEntryIndex) { + public long removeFrom(long logEntryIndex) { int adjustedIndex = adjustedIndex(logEntryIndex); if (adjustedIndex < 0 || adjustedIndex >= journal.size()) { // physical index should be less than list size and >= 0 - return; + return -1; + } + + for (int i = adjustedIndex; i < journal.size(); i++) { + dataSize -= journal.get(i).size(); } + journal.subList(adjustedIndex , journal.size()).clear(); + + return adjustedIndex; } @Override - public void append(ReplicatedLogEntry replicatedLogEntry) { - journal.add(replicatedLogEntry); + public boolean append(ReplicatedLogEntry replicatedLogEntry) { + if (replicatedLogEntry.getIndex() > lastIndex()) { + journal.add(replicatedLogEntry); + dataSize += replicatedLogEntry.size(); + return true; + } else { + LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}", + logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace")); + return false; + } + } + + @Override + public void increaseJournalLogCapacity(int amount) { + journal.ensureCapacity(journal.size() + amount); } @Override public List getFrom(long logEntryIndex) { - return getFrom(logEntryIndex, journal.size()); + return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE); } @Override - public List getFrom(long logEntryIndex, int max) { + public List getFrom(long logEntryIndex, int maxEntries, long maxDataSize) { int adjustedIndex = adjustedIndex(logEntryIndex); int size = journal.size(); - List entries = new ArrayList<>(100); if (adjustedIndex >= 0 && adjustedIndex < size) { // physical index should be less than list size and >= 0 - int maxIndex = adjustedIndex + max; - if(maxIndex > size){ + int maxIndex = adjustedIndex + maxEntries; + if (maxIndex > size) { maxIndex = size; } - entries.addAll(journal.subList(adjustedIndex, maxIndex)); + + if (maxDataSize == NO_MAX_SIZE) { + return new ArrayList<>(journal.subList(adjustedIndex, maxIndex)); + } else { + return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize); + } + } else { + return Collections.emptyList(); } - return entries; } + private List copyJournalEntries(int fromIndex, int toIndex, long maxDataSize) { + List retList = new ArrayList<>(toIndex - fromIndex); + long totalSize = 0; + for (int i = fromIndex; i < toIndex; i++) { + ReplicatedLogEntry entry = journal.get(i); + totalSize += entry.size(); + if (totalSize <= maxDataSize) { + retList.add(entry); + } else { + if (retList.isEmpty()) { + // Edge case - the first entry's size exceeds the threshold. We need to return + // at least the first entry so add it here. + retList.add(entry); + } + + break; + } + } + + return retList; + } @Override public long size() { - return journal.size(); + return journal.size(); + } + + @Override + public int dataSize() { + return dataSize; } @Override @@ -140,17 +200,12 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { return false; } int adjustedIndex = adjustedIndex(logEntryIndex); - return (adjustedIndex >= 0); + return adjustedIndex >= 0; } @Override public boolean isInSnapshot(long logEntryIndex) { - return logEntryIndex <= snapshotIndex && snapshotIndex != -1; - } - - @Override - public ByteString getSnapshot() { - return snapshot; + return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1; } @Override @@ -163,12 +218,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { return snapshotTerm; } - @Override - public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry); - - @Override - public abstract void removeFromAndPersist(long index); - @Override public void setSnapshotIndex(long snapshotIndex) { this.snapshotIndex = snapshotIndex; @@ -179,46 +228,47 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { this.snapshotTerm = snapshotTerm; } - @Override - public void setSnapshot(ByteString snapshot) { - this.snapshot = snapshot; - } - @Override public void clear(int startIndex, int endIndex) { journal.subList(startIndex, endIndex).clear(); } @Override - public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) { + public void snapshotPreCommit(long snapshotCapturedIndex, long snapshotCapturedTerm) { + Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex, + "snapshotCapturedIndex must be greater than or equal to snapshotIndex"); + snapshottedJournal = new ArrayList<>(journal.size()); - snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex))); - clear(0, (int) (snapshotCapturedIndex - snapshotIndex)); + List snapshotJournalEntries = + journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex)); + + snapshottedJournal.addAll(snapshotJournalEntries); + snapshotJournalEntries.clear(); previousSnapshotIndex = snapshotIndex; setSnapshotIndex(snapshotCapturedIndex); previousSnapshotTerm = snapshotTerm; setSnapshotTerm(snapshotCapturedTerm); - - previousSnapshot = getSnapshot(); - setSnapshot(snapshot); } @Override public void snapshotCommit() { - snapshottedJournal.clear(); snapshottedJournal = null; previousSnapshotIndex = -1; previousSnapshotTerm = -1; - previousSnapshot = null; + dataSize = 0; + // need to recalc the datasize based on the entries left after precommit. + for (ReplicatedLogEntry logEntry : journal) { + dataSize += logEntry.size(); + } + } @Override public void snapshotRollback() { snapshottedJournal.addAll(journal); - journal.clear(); journal = snapshottedJournal; snapshottedJournal = null; @@ -227,9 +277,10 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { snapshotTerm = previousSnapshotTerm; previousSnapshotTerm = -1; + } - snapshot = previousSnapshot; - previousSnapshot = null; - + @VisibleForTesting + ReplicatedLogEntry getAtPhysicalIndex(int index) { + return journal.get(index); } }