X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FAbstractReplicatedLogImpl.java;h=69b78458b61b62ffb6668653a4ae63019912f6a5;hp=b4b2afbc4ad602ccd1bc9f50da8641cd0b05f605;hb=a0b8be5ce48c0d1e0b573d1952211913c58d4935;hpb=2727bea09c83646b6cbd2ef9672d0b7f6cf3b22f diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java index b4b2afbc4a..69b78458b6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java @@ -12,12 +12,17 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Abstract class handling the mapping of * logical LogEntry Index and the physical list index. */ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { + private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class); + + private final String logContext; // We define this as ArrayList so we can use ensureCapacity. private ArrayList journal; @@ -32,18 +37,19 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { private int dataSize = 0; public AbstractReplicatedLogImpl(long snapshotIndex, - long snapshotTerm, List unAppliedEntries) { + long snapshotTerm, List unAppliedEntries, String logContext) { this.snapshotIndex = snapshotIndex; this.snapshotTerm = snapshotTerm; - this.journal = new ArrayList<>(unAppliedEntries); + this.logContext = logContext; - for(ReplicatedLogEntry entry: journal) { - dataSize += entry.size(); + this.journal = new ArrayList<>(unAppliedEntries.size()); + for(ReplicatedLogEntry entry: unAppliedEntries) { + append(entry); } } public AbstractReplicatedLogImpl() { - this(-1L, -1L, Collections.emptyList()); + this(-1L, -1L, Collections.emptyList(), ""); } protected int adjustedIndex(long logEntryIndex) { @@ -112,9 +118,16 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { } @Override - public void append(ReplicatedLogEntry replicatedLogEntry) { - journal.add(replicatedLogEntry); - dataSize += replicatedLogEntry.size(); + public boolean append(ReplicatedLogEntry replicatedLogEntry) { + if(replicatedLogEntry.getIndex() > lastIndex()) { + journal.add(replicatedLogEntry); + dataSize += replicatedLogEntry.size(); + return true; + } else { + LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}", + logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace")); + return false; + } } @Override @@ -124,20 +137,43 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { @Override public List getFrom(long logEntryIndex) { - return getFrom(logEntryIndex, journal.size()); + return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE); } @Override - public List getFrom(long logEntryIndex, int max) { + public List getFrom(long logEntryIndex, int maxEntries, long maxDataSize) { int adjustedIndex = adjustedIndex(logEntryIndex); int size = journal.size(); if (adjustedIndex >= 0 && adjustedIndex < size) { // physical index should be less than list size and >= 0 - int maxIndex = adjustedIndex + max; + int maxIndex = adjustedIndex + maxEntries; if(maxIndex > size){ maxIndex = size; } - return new ArrayList<>(journal.subList(adjustedIndex, maxIndex)); + + if(maxDataSize == NO_MAX_SIZE) { + return new ArrayList<>(journal.subList(adjustedIndex, maxIndex)); + } else { + List retList = new ArrayList<>(maxIndex - adjustedIndex); + long totalSize = 0; + for(int i = adjustedIndex; i < maxIndex; i++) { + ReplicatedLogEntry entry = journal.get(i); + totalSize += entry.size(); + if(totalSize <= maxDataSize) { + retList.add(entry); + } else { + if(retList.isEmpty()) { + // Edge case - the first entry's size exceeds the threshold. We need to return + // at least the first entry so add it here. + retList.add(entry); + } + + break; + } + } + + return retList; + } } else { return Collections.emptyList(); } @@ -178,12 +214,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog { return snapshotTerm; } - @Override - public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry); - - @Override - public abstract void removeFromAndPersist(long index); - @Override public void setSnapshotIndex(long snapshotIndex) { this.snapshotIndex = snapshotIndex;