private final JournalSegmentReader<E> reader;
private final ByteBuffer buffer;
- private Indexed<E> lastEntry;
- private int currentPosition;
-
DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
final JournalIndex index, final JournalSerdes namespace) {
super(channel, segment, maxEntrySize, index, namespace);
reset(0);
}
- DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous, final int position) {
+ DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
super(previous);
buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
reader = new JournalSegmentReader<>(segment,
new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
- lastEntry = previous.getLastEntry();
- currentPosition = position;
}
@Override
@Override
MappedJournalSegmentWriter<E> toMapped() {
- return new MappedJournalSegmentWriter<>(this, currentPosition);
+ return new MappedJournalSegmentWriter<>(this);
}
@Override
}
@Override
- void reset(final long index) {
- // acquire ownership of cache and make sure reader does not see anything we've done once we're done
- reader.invalidateCache();
- try {
- resetWithBuffer(index);
- } finally {
- // Make sure reader does not see anything we've done
- reader.invalidateCache();
- }
- }
-
- private void resetWithBuffer(final long index) {
- long nextIndex = firstIndex;
-
- // Clear the buffer indexes and acquire ownership of the buffer
- currentPosition = JournalSegmentDescriptor.BYTES;
- reader.setPosition(JournalSegmentDescriptor.BYTES);
-
- while (index == 0 || nextIndex <= index) {
- final var entry = reader.readEntry(nextIndex);
- if (entry == null) {
- break;
- }
-
- lastEntry = entry;
- this.index.index(nextIndex, currentPosition);
- nextIndex++;
-
- // Update the current position for indexing.
- currentPosition = currentPosition + HEADER_BYTES + entry.size();
- }
- }
-
- @Override
- Indexed<E> getLastEntry() {
- return lastEntry;
+ JournalSegmentReader<E> reader() {
+ return reader;
}
@Override