this.journalIndex = requireNonNull(journalIndex);
maxSegmentSize = segment.file().maxSize();
this.maxEntrySize = maxEntrySize;
- // adjust lastEntry value
- reset(0);
+
+ // recover currentPosition and lastIndex
+ reset(Long.MAX_VALUE, null);
}
JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
}
/**
- * Resets the head of the segment to the given index.
+ * Truncates the log to the given index.
*
- * @param index the index to which to reset the head of the segment
+ * @param index The index to which to truncate the log.
*/
- void reset(final long index) {
+ void truncate(final long index) {
+ // If the index is greater than or equal to the last index, skip the truncate.
+ if (index >= segment.lastIndex()) {
+ return;
+ }
+
+ // Truncate the index, find nearest indexed entry
+ final var nearest = journalIndex.truncate(index);
+
+ // recover position and last written
+ if (index >= segment.firstIndex()) {
+ reset(index, nearest);
+ } else {
+ currentPosition = JournalSegmentDescriptor.BYTES;
+ }
+
+ // Zero the entry header at current channel position.
+ fileWriter.writeEmptyHeader(currentPosition);
+ }
+
+ private void reset(final long maxNextIndex, final @Nullable Position position) {
// acquire ownership of cache and make sure reader does not see anything we've done once we're done
final var fileReader = fileWriter.reader();
try {
- resetWithBuffer(fileReader, index);
+ reset(fileReader, maxNextIndex, position);
} finally {
// Make sure reader does not see anything we've done
fileReader.invalidateCache();
}
}
- private void resetWithBuffer(final FileReader fileReader, final long index) {
- long nextIndex = segment.firstIndex();
+ private void reset(final FileReader fileReader, final long maxNextIndex, final @Nullable Position position) {
+ long nextIndex;
+ if (position != null) {
+ // look from nearest recovered index
+ nextIndex = position.index();
+ currentPosition = position.position();
+ } else {
+ // look from very beginning of the segment
+ nextIndex = segment.firstIndex();
+ currentPosition = JournalSegmentDescriptor.BYTES;
+ }
- // Clear the buffer indexes and acquire ownership of the buffer
- currentPosition = JournalSegmentDescriptor.BYTES;
final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
- reader.setPosition(JournalSegmentDescriptor.BYTES);
+ reader.setPosition(currentPosition);
- while (index == 0 || nextIndex <= index) {
+ while (nextIndex <= maxNextIndex) {
final var buf = reader.readBytes();
if (buf == null) {
break;
}
journalIndex.index(nextIndex++, currentPosition);
-
// Update the current position for indexing.
currentPosition += HEADER_BYTES + buf.readableBytes();
}
}
- /**
- * Truncates the log to the given index.
- *
- * @param index The index to which to truncate the log.
- */
- void truncate(final long index) {
- // If the index is greater than or equal to the last index, skip the truncate.
- if (index >= segment.lastIndex()) {
- return;
- }
-
- // Truncate the index.
- journalIndex.truncate(index);
-
- if (index < segment.firstIndex()) {
- // Reset the writer to the first entry.
- currentPosition = JournalSegmentDescriptor.BYTES;
- } else {
- // Reset the writer to the given index.
- reset(index);
- }
-
- // Zero the entry header at current channel position.
- fileWriter.writeEmptyHeader(currentPosition);
- }
-
/**
* Flushes written entries to disk.
*/