X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FJournalSegmentWriter.java;fp=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FJournalSegmentWriter.java;h=f9e13274059933aae38f2dc192e38fda93911ac1;hb=8668e863c9ede1a59b16998ec5efd7b1b619573a;hp=70cc790389484850e9e2e3bf0ec7379e583f5dc0;hpb=91bf2f5b4f1e0880ffa133b3d2a920063392811c;p=controller.git diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index 70cc790389..f9e1327405 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -46,8 +46,9 @@ final class JournalSegmentWriter { this.journalIndex = requireNonNull(journalIndex); maxSegmentSize = segment.file().maxSize(); this.maxEntrySize = maxEntrySize; - // adjust lastEntry value - reset(0); + + // recover currentPosition and lastIndex + reset(Long.MAX_VALUE, null); } JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) { @@ -108,68 +109,68 @@ final class JournalSegmentWriter { } /** - * Resets the head of the segment to the given index. + * Truncates the log to the given index. * - * @param index the index to which to reset the head of the segment + * @param index The index to which to truncate the log. */ - void reset(final long index) { + void truncate(final long index) { + // If the index is greater than or equal to the last index, skip the truncate. + if (index >= segment.lastIndex()) { + return; + } + + // Truncate the index, find nearest indexed entry + final var nearest = journalIndex.truncate(index); + + // recover position and last written + if (index >= segment.firstIndex()) { + reset(index, nearest); + } else { + currentPosition = JournalSegmentDescriptor.BYTES; + } + + // Zero the entry header at current channel position. + fileWriter.writeEmptyHeader(currentPosition); + } + + private void reset(final long maxNextIndex, final @Nullable Position position) { // acquire ownership of cache and make sure reader does not see anything we've done once we're done final var fileReader = fileWriter.reader(); try { - resetWithBuffer(fileReader, index); + reset(fileReader, maxNextIndex, position); } finally { // Make sure reader does not see anything we've done fileReader.invalidateCache(); } } - private void resetWithBuffer(final FileReader fileReader, final long index) { - long nextIndex = segment.firstIndex(); + private void reset(final FileReader fileReader, final long maxNextIndex, final @Nullable Position position) { + long nextIndex; + if (position != null) { + // look from nearest recovered index + nextIndex = position.index(); + currentPosition = position.position(); + } else { + // look from very beginning of the segment + nextIndex = segment.firstIndex(); + currentPosition = JournalSegmentDescriptor.BYTES; + } - // Clear the buffer indexes and acquire ownership of the buffer - currentPosition = JournalSegmentDescriptor.BYTES; final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize); - reader.setPosition(JournalSegmentDescriptor.BYTES); + reader.setPosition(currentPosition); - while (index == 0 || nextIndex <= index) { + while (nextIndex <= maxNextIndex) { final var buf = reader.readBytes(); if (buf == null) { break; } journalIndex.index(nextIndex++, currentPosition); - // Update the current position for indexing. currentPosition += HEADER_BYTES + buf.readableBytes(); } } - /** - * Truncates the log to the given index. - * - * @param index The index to which to truncate the log. - */ - void truncate(final long index) { - // If the index is greater than or equal to the last index, skip the truncate. - if (index >= segment.lastIndex()) { - return; - } - - // Truncate the index. - journalIndex.truncate(index); - - if (index < segment.firstIndex()) { - // Reset the writer to the first entry. - currentPosition = JournalSegmentDescriptor.BYTES; - } else { - // Reset the writer to the given index. - reset(index); - } - - // Zero the entry header at current channel position. - fileWriter.writeEmptyHeader(currentPosition); - } - /** * Flushes written entries to disk. */