*/
package org.opendaylight.controller.cluster.raft;
-import com.google.protobuf.ByteString;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import org.eclipse.jdt.annotation.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Abstract class handling the mapping of
* logical LogEntry Index and the physical list index.
*/
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
+
+ private final String logContext;
// We define this as ArrayList so we can use ensureCapacity.
- protected ArrayList<ReplicatedLogEntry> journal;
- protected ByteString snapshot;
- protected long snapshotIndex = -1;
- protected long snapshotTerm = -1;
+ private ArrayList<ReplicatedLogEntry> journal;
+
+ private long snapshotIndex = -1;
+ private long snapshotTerm = -1;
// to be used for rollback during save snapshot failure
- protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
- protected ByteString previousSnapshot;
- protected long previousSnapshotIndex = -1;
- protected long previousSnapshotTerm = -1;
-
- public AbstractReplicatedLogImpl(ByteString state, long snapshotIndex,
- long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
- this.snapshot = state;
+ private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+ private long previousSnapshotIndex = -1;
+ private long previousSnapshotTerm = -1;
+ private int dataSize = 0;
+
+ protected AbstractReplicatedLogImpl(final long snapshotIndex, final long snapshotTerm,
+ final List<ReplicatedLogEntry> unAppliedEntries, final String logContext) {
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
- this.journal = new ArrayList<>(unAppliedEntries);
- }
+ this.logContext = logContext;
+ journal = new ArrayList<>(unAppliedEntries.size());
+ for (ReplicatedLogEntry entry: unAppliedEntries) {
+ append(entry);
+ }
+ }
- public AbstractReplicatedLogImpl() {
- this.snapshot = null;
- this.journal = new ArrayList<>();
+ protected AbstractReplicatedLogImpl() {
+ this(-1L, -1L, Collections.emptyList(), "");
}
- protected int adjustedIndex(long logEntryIndex) {
- if(snapshotIndex < 0){
+ protected int adjustedIndex(final long logEntryIndex) {
+ if (snapshotIndex < 0) {
return (int) logEntryIndex;
}
return (int) (logEntryIndex - (snapshotIndex + 1));
}
@Override
- public ReplicatedLogEntry get(long logEntryIndex) {
+ public ReplicatedLogEntry get(final long logEntryIndex) {
int adjustedIndex = adjustedIndex(logEntryIndex);
if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
}
@Override
- public void removeFrom(long logEntryIndex) {
+ public long removeFrom(final long logEntryIndex) {
int adjustedIndex = adjustedIndex(logEntryIndex);
if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
// physical index should be less than list size and >= 0
- return;
+ return -1;
}
+
+ for (int i = adjustedIndex; i < journal.size(); i++) {
+ dataSize -= journal.get(i).size();
+ }
+
journal.subList(adjustedIndex , journal.size()).clear();
+
+ return adjustedIndex;
}
@Override
- public void append(ReplicatedLogEntry replicatedLogEntry) {
- journal.add(replicatedLogEntry);
+ public boolean append(final ReplicatedLogEntry replicatedLogEntry) {
+ if (replicatedLogEntry.getIndex() > lastIndex()) {
+ journal.add(replicatedLogEntry);
+ dataSize += replicatedLogEntry.size();
+ return true;
+ } else {
+ LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
+ logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
+ return false;
+ }
}
@Override
- public void increaseJournalLogCapacity(int amount) {
+ public void increaseJournalLogCapacity(final int amount) {
journal.ensureCapacity(journal.size() + amount);
}
@Override
- public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
- return getFrom(logEntryIndex, journal.size());
+ public List<ReplicatedLogEntry> getFrom(final long logEntryIndex) {
+ return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
}
@Override
- public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
+ public List<ReplicatedLogEntry> getFrom(final long logEntryIndex, final int maxEntries, final long maxDataSize) {
int adjustedIndex = adjustedIndex(logEntryIndex);
int size = journal.size();
- List<ReplicatedLogEntry> entries = new ArrayList<>(100);
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
- int maxIndex = adjustedIndex + max;
- if(maxIndex > size){
+ int maxIndex = adjustedIndex + maxEntries;
+ if (maxIndex > size) {
maxIndex = size;
}
- entries.addAll(journal.subList(adjustedIndex, maxIndex));
+
+ if (maxDataSize == NO_MAX_SIZE) {
+ return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+ } else {
+ return copyJournalEntries(adjustedIndex, maxIndex, maxDataSize);
+ }
+ } else {
+ return Collections.emptyList();
}
- return entries;
}
+ private @NonNull List<ReplicatedLogEntry> copyJournalEntries(final int fromIndex, final int toIndex,
+ final long maxDataSize) {
+ List<ReplicatedLogEntry> retList = new ArrayList<>(toIndex - fromIndex);
+ long totalSize = 0;
+ for (int i = fromIndex; i < toIndex; i++) {
+ ReplicatedLogEntry entry = journal.get(i);
+ totalSize += entry.serializedSize();
+ if (totalSize <= maxDataSize) {
+ retList.add(entry);
+ } else {
+ if (retList.isEmpty()) {
+ // Edge case - the first entry's size exceeds the threshold. We need to return
+ // at least the first entry so add it here.
+ retList.add(entry);
+ }
+
+ break;
+ }
+ }
+
+ return retList;
+ }
@Override
public long size() {
- return journal.size();
+ return journal.size();
+ }
+
+ @Override
+ public int dataSize() {
+ return dataSize;
}
@Override
- public boolean isPresent(long logEntryIndex) {
+ public boolean isPresent(final long logEntryIndex) {
if (logEntryIndex > lastIndex()) {
// if the request logical index is less than the last present in the list
return false;
}
int adjustedIndex = adjustedIndex(logEntryIndex);
- return (adjustedIndex >= 0);
- }
-
- @Override
- public boolean isInSnapshot(long logEntryIndex) {
- return logEntryIndex <= snapshotIndex && snapshotIndex != -1;
+ return adjustedIndex >= 0;
}
@Override
- public ByteString getSnapshot() {
- return snapshot;
+ public boolean isInSnapshot(final long logEntryIndex) {
+ return logEntryIndex >= 0 && logEntryIndex <= snapshotIndex && snapshotIndex != -1;
}
@Override
}
@Override
- public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
-
- @Override
- public abstract void removeFromAndPersist(long index);
-
- @Override
- public void setSnapshotIndex(long snapshotIndex) {
+ public void setSnapshotIndex(final long snapshotIndex) {
this.snapshotIndex = snapshotIndex;
}
@Override
- public void setSnapshotTerm(long snapshotTerm) {
+ public void setSnapshotTerm(final long snapshotTerm) {
this.snapshotTerm = snapshotTerm;
}
@Override
- public void setSnapshot(ByteString snapshot) {
- this.snapshot = snapshot;
- }
-
- @Override
- public void clear(int startIndex, int endIndex) {
+ public void clear(final int startIndex, final int endIndex) {
journal.subList(startIndex, endIndex).clear();
}
@Override
- public void snapshotPreCommit(ByteString snapshot, long snapshotCapturedIndex, long snapshotCapturedTerm) {
+ public void snapshotPreCommit(final long snapshotCapturedIndex, final long snapshotCapturedTerm) {
+ Preconditions.checkArgument(snapshotCapturedIndex >= snapshotIndex,
+ "snapshotCapturedIndex must be greater than or equal to snapshotIndex");
+
snapshottedJournal = new ArrayList<>(journal.size());
- snapshottedJournal.addAll(journal.subList(0, (int)(snapshotCapturedIndex - snapshotIndex)));
- clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+ List<ReplicatedLogEntry> snapshotJournalEntries =
+ journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
+
+ snapshottedJournal.addAll(snapshotJournalEntries);
+ snapshotJournalEntries.clear();
previousSnapshotIndex = snapshotIndex;
setSnapshotIndex(snapshotCapturedIndex);
previousSnapshotTerm = snapshotTerm;
setSnapshotTerm(snapshotCapturedTerm);
-
- previousSnapshot = getSnapshot();
- setSnapshot(snapshot);
}
@Override
- public void snapshotCommit() {
+ public void snapshotCommit(final boolean updateDataSize) {
snapshottedJournal = null;
previousSnapshotIndex = -1;
previousSnapshotTerm = -1;
- previousSnapshot = null;
+
+ if (updateDataSize) {
+ // need to recalc the datasize based on the entries left after precommit.
+ int newDataSize = 0;
+ for (ReplicatedLogEntry logEntry : journal) {
+ newDataSize += logEntry.size();
+ }
+ LOG.trace("{}: Updated dataSize from {} to {}", logContext, dataSize, newDataSize);
+ dataSize = newDataSize;
+ }
}
@Override
snapshotTerm = previousSnapshotTerm;
previousSnapshotTerm = -1;
+ }
- snapshot = previousSnapshot;
- previousSnapshot = null;
-
+ @VisibleForTesting
+ ReplicatedLogEntry getAtPhysicalIndex(final int index) {
+ return journal.get(index);
}
}