@Override
public List<ReplicatedLogEntry> getFrom(long logEntryIndex) {
- return getFrom(logEntryIndex, journal.size());
+ return getFrom(logEntryIndex, journal.size(), NO_MAX_SIZE);
}
@Override
- public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
+ public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int maxEntries, long maxDataSize) {
int adjustedIndex = adjustedIndex(logEntryIndex);
int size = journal.size();
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
- int maxIndex = adjustedIndex + max;
+ int maxIndex = adjustedIndex + maxEntries;
if(maxIndex > size){
maxIndex = size;
}
- return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+
+ if(maxDataSize == NO_MAX_SIZE) {
+ return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+ } else {
+ List<ReplicatedLogEntry> retList = new ArrayList<>(maxIndex - adjustedIndex);
+ long totalSize = 0;
+ for(int i = adjustedIndex; i < maxIndex; i++) {
+ ReplicatedLogEntry entry = journal.get(i);
+ totalSize += entry.size();
+ if(totalSize <= maxDataSize) {
+ retList.add(entry);
+ } else {
+ if(retList.isEmpty()) {
+ // Edge case - the first entry's size exceeds the threshold. We need to return
+ // at least the first entry so add it here.
+ retList.add(entry);
+ }
+
+ break;
+ }
+ }
+
+ return retList;
+ }
} else {
return Collections.emptyList();
}
List<ReplicatedLogEntry> snapshotJournalEntries = journal.subList(0, (int) (snapshotCapturedIndex - snapshotIndex));
snapshottedJournal.addAll(snapshotJournalEntries);
- clear(0, (int) (snapshotCapturedIndex - snapshotIndex));
+ snapshotJournalEntries.clear();
previousSnapshotIndex = snapshotIndex;
setSnapshotIndex(snapshotCapturedIndex);