private int currentPosition;
JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
- final JournalIndex journalIndex) {
+ final JournalIndex journalIndex, final int currentPosition) {
this.fileWriter = requireNonNull(fileWriter);
this.segment = requireNonNull(segment);
this.journalIndex = requireNonNull(journalIndex);
- maxSegmentSize = segment.file().maxSize();
this.maxEntrySize = maxEntrySize;
-
- // recover currentPosition and lastIndex
- reset(Long.MAX_VALUE, null);
+ this.currentPosition = currentPosition;
+ maxSegmentSize = segment.file().maxSize();
}
JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
// Truncate the index, find nearest indexed entry
final var nearest = journalIndex.truncate(index);
- // recover position and last written
- if (index >= segment.firstIndex()) {
- reset(index, nearest);
- } else {
- currentPosition = JournalSegmentDescriptor.BYTES;
- }
+ currentPosition = index < segment.firstIndex() ? JournalSegmentDescriptor.BYTES
+ // recover position and last written
+ : JournalSegment.indexEntries(fileWriter, segment, maxEntrySize, journalIndex, index, nearest);
// Zero the entry header at current channel position.
fileWriter.writeEmptyHeader(currentPosition);
}
- private void reset(final long maxNextIndex, final @Nullable Position position) {
- // acquire ownership of cache and make sure reader does not see anything we've done once we're done
- final var fileReader = fileWriter.reader();
- try {
- reset(fileReader, maxNextIndex, position);
- } finally {
- // Make sure reader does not see anything we've done
- fileReader.invalidateCache();
- }
- }
-
- private void reset(final FileReader fileReader, final long maxNextIndex, final @Nullable Position position) {
- long nextIndex;
- if (position != null) {
- // look from nearest recovered index
- nextIndex = position.index();
- currentPosition = position.position();
- } else {
- // look from very beginning of the segment
- nextIndex = segment.firstIndex();
- currentPosition = JournalSegmentDescriptor.BYTES;
- }
-
- final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
- reader.setPosition(currentPosition);
-
- while (nextIndex <= maxNextIndex) {
- final var buf = reader.readBytes();
- if (buf == null) {
- break;
- }
-
- journalIndex.index(nextIndex++, currentPosition);
- // Update the current position for indexing.
- currentPosition += HEADER_BYTES + buf.readableBytes();
- }
- }
-
/**
* Flushes written entries to disk.
*/