import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Abstract class handling the mapping of
* logical LogEntry Index and the physical list index.
*/
public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractReplicatedLogImpl.class);
+
+ private final String logContext;
// We define this as ArrayList so we can use ensureCapacity.
private ArrayList<ReplicatedLogEntry> journal;
private int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
- long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
+ long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries, String logContext) {
this.snapshotIndex = snapshotIndex;
this.snapshotTerm = snapshotTerm;
- this.journal = new ArrayList<>(unAppliedEntries);
+ this.logContext = logContext;
- for(ReplicatedLogEntry entry: journal) {
- dataSize += entry.size();
+ this.journal = new ArrayList<>(unAppliedEntries.size());
+ for(ReplicatedLogEntry entry: unAppliedEntries) {
+ append(entry);
}
}
public AbstractReplicatedLogImpl() {
- this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
+ this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), "");
}
protected int adjustedIndex(long logEntryIndex) {
}
@Override
- public void append(ReplicatedLogEntry replicatedLogEntry) {
- journal.add(replicatedLogEntry);
- dataSize += replicatedLogEntry.size();
+ public boolean append(ReplicatedLogEntry replicatedLogEntry) {
+ if(replicatedLogEntry.getIndex() > lastIndex()) {
+ journal.add(replicatedLogEntry);
+ dataSize += replicatedLogEntry.size();
+ return true;
+ } else {
+ LOG.warn("{}: Cannot append new entry - new index {} is not greater than the last index {}",
+ logContext, replicatedLogEntry.getIndex(), lastIndex(), new Exception("stack trace"));
+ return false;
+ }
}
@Override
@Override
public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
- return getFrom(logEntryIndex, journal.size());
+ return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
}
@Override
- public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
+ public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
int adjustedIndex = adjustedIndex(logEntryIndex);
int size = journal.size();
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
- int maxIndex = adjustedIndex + max;
+ int maxIndex = adjustedIndex + maxEntries;
if(maxIndex > size){
maxIndex = size;
}
- return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+
+ if(maxDataSize == NO_MAX_SIZE) {
+ return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+ } else {
+ List<ReplicatedLogEntry> retList = new ArrayList<>(maxIndex - adjustedIndex);
+ long totalSize = 0;
+ for(int i = adjustedIndex; i < maxIndex; i++) {
+ ReplicatedLogEntry entry = journal.get(i);
+ totalSize += entry.size();
+ if(totalSize <= maxDataSize) {
+ retList.add(entry);
+ } else {
+ if(retList.isEmpty()) {
+ // Edge case - the first entry's size exceeds the threshold. We need to return
+ // at least the first entry so add it here.
+ retList.add(entry);
+ }
+
+ break;
+ }
+ }
+
+ return retList;
+ }
} else {
return Collections.emptyList();
}
return snapshotTerm;
}
- @Override
- public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
-
- @Override
- public abstract void removeFromAndPersist(long index);
-
@Override
public void setSnapshotIndex(long snapshotIndex) {
this.snapshotIndex = snapshotIndex;
List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
snapshottedJournal.addAll(snapshotJournalEntries);
- clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+ snapshotJournalEntries.clear();
previousSnapshotIndex = snapshotIndex;
setSnapshotIndex(snapshotCapturedIndex);