Unify JournalSegmentWriter.truncate() 79/111079/3
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 26 Mar 2024 20:59:30 +0000 (21:59 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 27 Mar 2024 00:15:46 +0000 (01:15 +0100)
Implementations of truncate are practically identical, merge them into a
common method. Also eliminate JournalSegmentWriter.firstIndex, as we can
get it from the segment at any time.

JIRA: CONTROLLER-2100
Change-Id: I019978461b67693f109cd6e90c8fba2ed8b18a3f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java

index 7120158839145e9c3f88307fb1eb31238907a809..d3aa3332c7a56eddd71c4f36193398e386d78323 100644 (file)
@@ -134,29 +134,9 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   @Override
-  void truncate(final long index) {
-    // If the index is greater than or equal to the last index, skip the truncate.
-    if (index >= getLastIndex()) {
-      return;
-    }
-
-    // Reset the last entry.
-    lastEntry = null;
-
-    // Truncate the index.
-    this.index.truncate(index);
-
+  void writeEmptyHeader(final int position) {
     try {
-      if (index < firstIndex) {
-        // Reset the writer to the first entry.
-        currentPosition = JournalSegmentDescriptor.BYTES;
-      } else {
-        // Reset the writer to the given index.
-        reset(index);
-      }
-
-      // Zero the entry header at current channel position.
-      channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), currentPosition);
+      channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
     } catch (IOException e) {
       throw new StorageException(e);
     }
index f6a7e9498786d58ca7771d161d2706c506f252a6..e08e50de9e8f62befbbcc5208ee73c79ece6aa38 100644 (file)
@@ -31,7 +31,6 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
     final @NonNull JournalSerdes namespace;
     final int maxSegmentSize;
     final int maxEntrySize;
-    final long firstIndex;
 
     // FIXME: hide these two fields
     Indexed<E> lastEntry;
@@ -45,7 +44,6 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         this.namespace = requireNonNull(namespace);
         maxSegmentSize = segment.descriptor().maxSegmentSize();
         this.maxEntrySize = maxEntrySize;
-        firstIndex = segment.index();
     }
 
     JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
@@ -55,7 +53,6 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         namespace = previous.namespace;
         maxSegmentSize = previous.maxSegmentSize;
         maxEntrySize = previous.maxEntrySize;
-        firstIndex = previous.firstIndex;
         lastEntry = previous.lastEntry;
         currentPosition = previous.currentPosition;
     }
@@ -66,7 +63,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @return The last written index.
      */
     final long getLastIndex() {
-        return lastEntry != null ? lastEntry.index() : firstIndex - 1;
+        return lastEntry != null ? lastEntry.index() : segment.index() - 1;
     }
 
     /**
@@ -84,7 +81,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @return The next index to be written.
      */
     final long getNextIndex() {
-        return lastEntry != null ? lastEntry.index() + 1 : firstIndex;
+        return lastEntry != null ? lastEntry.index() + 1 : segment.index();
     }
 
     /**
@@ -115,7 +112,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
     abstract JournalSegmentReader<E> reader();
 
     private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
-        long nextIndex = firstIndex;
+        long nextIndex = segment.index();
 
         // Clear the buffer indexes and acquire ownership of the buffer
         currentPosition = JournalSegmentDescriptor.BYTES;
@@ -141,7 +138,36 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      *
      * @param index The index to which to truncate the log.
      */
-    abstract void truncate(long index);
+    final void truncate(final long index) {
+        // If the index is greater than or equal to the last index, skip the truncate.
+        if (index >= getLastIndex()) {
+          return;
+        }
+
+        // Reset the last entry.
+        lastEntry = null;
+
+        // Truncate the index.
+        this.index.truncate(index);
+
+        if (index < segment.index()) {
+          // Reset the writer to the first entry.
+          currentPosition = JournalSegmentDescriptor.BYTES;
+        } else {
+          // Reset the writer to the given index.
+          reset(index);
+        }
+
+        // Zero the entry header at current channel position.
+        writeEmptyHeader(currentPosition);
+    }
+
+    /**
+     * Write {@link SegmentEntry#HEADER_BYTES} worth of zeroes at specified position.
+     *
+     * @param position position to write to
+     */
+    abstract void writeEmptyHeader(int position);
 
     /**
      * Flushes written entries to disk.
index 9f437b6125f4c68d8ea126434f1b805da5c9c478..ffb08d29e1ba7f4788088e13b18d849ac5294705 100644 (file)
@@ -142,29 +142,9 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   @Override
-  void truncate(final long index) {
-    // If the index is greater than or equal to the last index, skip the truncate.
-    if (index >= getLastIndex()) {
-      return;
-    }
-
-    // Reset the last entry.
-    lastEntry = null;
-
-    // Truncate the index.
-    this.index.truncate(index);
-
-    if (index < firstIndex) {
-      // Reset the writer to the first entry.
-      currentPosition = JournalSegmentDescriptor.BYTES;
-    } else {
-      // Reset the writer to the given index.
-      reset(index);
-    }
-
-    // Zero the entry header at current buffer position.
-    // Note: we issue a single putLong() instead of two putInt()s.
-    buffer.putLong(currentPosition, 0L);
+  void writeEmptyHeader(final int position) {
+      // Note: we issue a single putLong() instead of two putInt()s.
+      buffer.putLong(position, 0L);
   }
 
   @Override