*
* @return The segment's starting index.
*/
- long index() {
+ long firstIndex() {
return descriptor.index();
}
return MoreObjects.toStringHelper(this)
.add("id", descriptor.id())
.add("version", descriptor.version())
- .add("index", index())
+ .add("index", firstIndex())
.toString();
}
}
* @return The last written index.
*/
final long getLastIndex() {
- return lastEntry != null ? lastEntry.index() : segment.index() - 1;
+ return lastEntry != null ? lastEntry.index() : segment.firstIndex() - 1;
}
/**
* @return The next index to be written.
*/
final long getNextIndex() {
- return lastEntry != null ? lastEntry.index() + 1 : segment.index();
+ return lastEntry != null ? lastEntry.index() + 1 : segment.firstIndex();
}
/**
abstract JournalSegmentReader<E> reader();
private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
- long nextIndex = segment.index();
+ long nextIndex = segment.firstIndex();
// Clear the buffer indexes and acquire ownership of the buffer
currentPosition = JournalSegmentDescriptor.BYTES;
// Truncate the index.
this.index.truncate(index);
- if (index < segment.index()) {
+ if (index < segment.firstIndex()) {
// Reset the writer to the first entry.
currentPosition = JournalSegmentDescriptor.BYTES;
} else {
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
-import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
// If the index already equals the first segment index, skip the reset.
JournalSegment<E> firstSegment = getFirstSegment();
- if (index == firstSegment.index()) {
+ if (index == firstSegment.firstIndex()) {
return firstSegment;
}
synchronized JournalSegment<E> getSegment(long index) {
assertOpen();
// Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
- if (currentSegment != null && index > currentSegment.index()) {
+ if (currentSegment != null && index > currentSegment.firstIndex()) {
return currentSegment;
}
* @param segment The segment to remove.
*/
synchronized void removeSegment(JournalSegment<E> segment) {
- segments.remove(segment.index());
+ segments.remove(segment.firstIndex());
segment.close();
segment.delete();
resetCurrentSegment();
// Add the segment to the segments list.
LOG.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
- segments.put(segment.index(), segment);
+ segments.put(segment.firstIndex(), segment);
}
}
Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
while (iterator.hasNext()) {
JournalSegment<E> segment = iterator.next().getValue();
- if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
+ if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
corrupted = true;
}
*/
public boolean isCompactable(long index) {
Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
- return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
+ return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
}
/**
*/
public long getCompactableIndex(long index) {
Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
- return segmentEntry != null ? segmentEntry.getValue().index() : 0;
+ return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
}
/**
* @param index The index up to which to compact the journal.
*/
public void compact(long index) {
- Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
+ final var segmentEntry = segments.floorEntry(index);
if (segmentEntry != null) {
- SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
+ final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
if (!compactSegments.isEmpty()) {
LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
for (JournalSegment<E> segment : compactSegments.values()) {
segment.delete();
}
compactSegments.clear();
- resetHead(segmentEntry.getValue().index());
+ resetHead(segmentEntry.getValue().firstIndex());
}
}
}
this.journal = requireNonNull(journal);
currentSegment = requireNonNull(segment);
currentReader = segment.createReader();
- nextIndex = currentSegment.index();
+ nextIndex = currentSegment.firstIndex();
currentEntry = null;
}
@Override
public final long getFirstIndex() {
- return journal.getFirstSegment().index();
+ return journal.getFirstSegment().firstIndex();
}
@Override
currentSegment = journal.getFirstSegment();
currentReader = currentSegment.createReader();
- nextIndex = currentSegment.index();
+ nextIndex = currentSegment.firstIndex();
currentEntry = null;
}
nextIndex = position.index();
currentReader.setPosition(position.position());
} else {
- nextIndex = currentSegment.index();
+ nextIndex = currentSegment.firstIndex();
currentReader.setPosition(JournalSegmentDescriptor.BYTES);
}
while (nextIndex < index && tryNext() != null) {
* Rewinds the journal to the given index.
*/
private void rewind(final long index) {
- if (currentSegment.index() >= index) {
+ if (currentSegment.firstIndex() >= index) {
JournalSegment<E> segment = journal.getSegment(index - 1);
if (segment != null) {
currentReader.close();
public Indexed<E> tryNext() {
var next = currentReader.readEntry(nextIndex);
if (next == null) {
- final var nextSegment = journal.getNextSegment(currentSegment.index());
- if (nextSegment == null || nextSegment.index() != nextIndex) {
+ final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
+ if (nextSegment == null || nextSegment.firstIndex() != nextIndex) {
return null;
}
@Override
public void reset(long index) {
- if (index > currentSegment.index()) {
+ if (index > currentSegment.firstIndex()) {
currentSegment.releaseWriter();
currentSegment = journal.resetSegments(index);
currentWriter = currentSegment.acquireWriter();
try {
return currentWriter.append(entry);
} catch (BufferOverflowException e) {
- if (currentSegment.index() == currentWriter.getNextIndex()) {
+ if (currentSegment.firstIndex() == currentWriter.getNextIndex()) {
throw e;
}
}
}
// Delete all segments with first indexes greater than the given index.
- while (index < currentSegment.index() && currentSegment != journal.getFirstSegment()) {
+ while (index < currentSegment.firstIndex() && currentSegment != journal.getFirstSegment()) {
currentSegment.releaseWriter();
journal.removeSegment(currentSegment);
currentSegment = journal.getLastSegment();