Utilize segment index to recover writer state after truncate 65/111465/7
authorRuslan Kashapov <ruslan.kashapov@pantheon.tech>
Fri, 19 Apr 2024 08:15:58 +0000 (11:15 +0300)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 8 May 2024 15:11:34 +0000 (17:11 +0200)
index.truncate() returns nearest indexed entry position which
can be used to minimize number of loops required to recover
writer state - current position and last written entry.

JIRA: CONTROLLER-2100
Change-Id: I6875c1697a2ac5f13b82b256850f293a7658f220
Signed-off-by: Ruslan Kashapov <ruslan.kashapov@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java

index 70cc790389484850e9e2e3bf0ec7379e583f5dc0..f9e13274059933aae38f2dc192e38fda93911ac1 100644 (file)
@@ -46,8 +46,9 @@ final class JournalSegmentWriter {
         this.journalIndex = requireNonNull(journalIndex);
         maxSegmentSize = segment.file().maxSize();
         this.maxEntrySize = maxEntrySize;
-        // adjust lastEntry value
-        reset(0);
+
+        // recover currentPosition and lastIndex
+        reset(Long.MAX_VALUE, null);
     }
 
     JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
@@ -108,68 +109,68 @@ final class JournalSegmentWriter {
     }
 
     /**
-     * Resets the head of the segment to the given index.
+     * Truncates the log to the given index.
      *
-     * @param index the index to which to reset the head of the segment
+     * @param index The index to which to truncate the log.
      */
-    void reset(final long index) {
+    void truncate(final long index) {
+        // If the index is greater than or equal to the last index, skip the truncate.
+        if (index >= segment.lastIndex()) {
+            return;
+        }
+
+        // Truncate the index, find nearest indexed entry
+        final var nearest = journalIndex.truncate(index);
+
+        // recover position and last written
+        if (index >= segment.firstIndex()) {
+            reset(index, nearest);
+        } else {
+            currentPosition = JournalSegmentDescriptor.BYTES;
+        }
+
+        // Zero the entry header at current channel position.
+        fileWriter.writeEmptyHeader(currentPosition);
+    }
+
+    private void reset(final long maxNextIndex, final @Nullable Position position) {
         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
         final var fileReader = fileWriter.reader();
         try {
-            resetWithBuffer(fileReader, index);
+            reset(fileReader, maxNextIndex, position);
         } finally {
             // Make sure reader does not see anything we've done
             fileReader.invalidateCache();
         }
     }
 
-    private void resetWithBuffer(final FileReader fileReader, final long index) {
-        long nextIndex = segment.firstIndex();
+    private void reset(final FileReader fileReader, final long maxNextIndex, final @Nullable Position position) {
+        long nextIndex;
+        if (position != null) {
+            // look from nearest recovered index
+            nextIndex = position.index();
+            currentPosition = position.position();
+        } else {
+            // look from very beginning of the segment
+            nextIndex = segment.firstIndex();
+            currentPosition = JournalSegmentDescriptor.BYTES;
+        }
 
-        // Clear the buffer indexes and acquire ownership of the buffer
-        currentPosition = JournalSegmentDescriptor.BYTES;
         final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
-        reader.setPosition(JournalSegmentDescriptor.BYTES);
+        reader.setPosition(currentPosition);
 
-        while (index == 0 || nextIndex <= index) {
+        while (nextIndex <= maxNextIndex) {
             final var buf = reader.readBytes();
             if (buf == null) {
                 break;
             }
 
             journalIndex.index(nextIndex++, currentPosition);
-
             // Update the current position for indexing.
             currentPosition += HEADER_BYTES + buf.readableBytes();
         }
     }
 
-    /**
-     * Truncates the log to the given index.
-     *
-     * @param index The index to which to truncate the log.
-     */
-    void truncate(final long index) {
-        // If the index is greater than or equal to the last index, skip the truncate.
-        if (index >= segment.lastIndex()) {
-            return;
-        }
-
-        // Truncate the index.
-        journalIndex.truncate(index);
-
-        if (index < segment.firstIndex()) {
-            // Reset the writer to the first entry.
-            currentPosition = JournalSegmentDescriptor.BYTES;
-        } else {
-            // Reset the writer to the given index.
-            reset(index);
-        }
-
-        // Zero the entry header at current channel position.
-        fileWriter.writeEmptyHeader(currentPosition);
-    }
-
     /**
      * Flushes written entries to disk.
      */