Unify JournalSegmentWriter.reset(long) 78/111078/3
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 26 Mar 2024 20:43:19 +0000 (21:43 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 27 Mar 2024 00:15:46 +0000 (01:15 +0100)
We have two distinct implementations here. The version for
StorageLevel.DISK is generic enough to work for StorageLevel.MAPPED, and
will guarantee consistency with reader implementation (which is already
shared across StorageLevels).

JIRA: CONTROLLER-2100
Change-Id: I3e9a5c8a72be766431e5cb527cae1f4809964dd7
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java

index b634169fe848f34854d16007a600c6dd6734f5e9..7120158839145e9c3f88307fb1eb31238907a809 100644 (file)
@@ -49,9 +49,6 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   private final JournalSegmentReader<E> reader;
   private final ByteBuffer buffer;
 
-  private Indexed<E> lastEntry;
-  private int currentPosition;
-
   DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
           final JournalIndex index, final JournalSerdes namespace) {
     super(channel, segment, maxEntrySize, index, namespace);
@@ -62,14 +59,12 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     reset(0);
   }
 
-  DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous, final int position) {
+  DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
     super(previous);
 
     buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
     reader = new JournalSegmentReader<>(segment,
         new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
-    lastEntry = previous.getLastEntry();
-    currentPosition = position;
   }
 
   @Override
@@ -79,7 +74,7 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
   @Override
   MappedJournalSegmentWriter<E> toMapped() {
-    return new MappedJournalSegmentWriter<>(this, currentPosition);
+    return new MappedJournalSegmentWriter<>(this);
   }
 
   @Override
@@ -88,42 +83,8 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   @Override
-  void reset(final long index) {
-      // acquire ownership of cache and make sure reader does not see anything we've done once we're done
-      reader.invalidateCache();
-      try {
-          resetWithBuffer(index);
-      } finally {
-          // Make sure reader does not see anything we've done
-          reader.invalidateCache();
-      }
-  }
-
-  private void resetWithBuffer(final long index) {
-      long nextIndex = firstIndex;
-
-      // Clear the buffer indexes and acquire ownership of the buffer
-      currentPosition = JournalSegmentDescriptor.BYTES;
-      reader.setPosition(JournalSegmentDescriptor.BYTES);
-
-      while (index == 0 || nextIndex <= index) {
-          final var entry = reader.readEntry(nextIndex);
-          if (entry == null) {
-              break;
-          }
-
-          lastEntry = entry;
-          this.index.index(nextIndex, currentPosition);
-          nextIndex++;
-
-          // Update the current position for indexing.
-          currentPosition = currentPosition + HEADER_BYTES + entry.size();
-      }
-  }
-
-  @Override
-  Indexed<E> getLastEntry() {
-    return lastEntry;
+  JournalSegmentReader<E> reader() {
+    return reader;
   }
 
   @Override
index 23b32b2e094889332a2ef2152f7cf8e59b022eb0..f6a7e9498786d58ca7771d161d2706c506f252a6 100644 (file)
@@ -15,6 +15,7 @@
  */
 package io.atomix.storage.journal;
 
+import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 import static java.util.Objects.requireNonNull;
 
 import io.atomix.storage.journal.index.JournalIndex;
@@ -32,6 +33,10 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
     final int maxEntrySize;
     final long firstIndex;
 
+    // FIXME: hide these two fields
+    Indexed<E> lastEntry;
+    int currentPosition;
+
     JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
             final JournalIndex index, final JournalSerdes namespace) {
         this.channel = requireNonNull(channel);
@@ -51,6 +56,8 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         maxSegmentSize = previous.maxSegmentSize;
         maxEntrySize = previous.maxEntrySize;
         firstIndex = previous.firstIndex;
+        lastEntry = previous.lastEntry;
+        currentPosition = previous.currentPosition;
     }
 
     /**
@@ -59,8 +66,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @return The last written index.
      */
     final long getLastIndex() {
-        final Indexed<?> lastEntry;
-        return (lastEntry = getLastEntry()) != null ? lastEntry.index() : firstIndex - 1;
+        return lastEntry != null ? lastEntry.index() : firstIndex - 1;
     }
 
     /**
@@ -68,7 +74,9 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      *
      * @return The last entry written.
      */
-    abstract Indexed<E> getLastEntry();
+    final Indexed<E> getLastEntry() {
+        return lastEntry;
+    }
 
     /**
      * Returns the next index to be written.
@@ -76,8 +84,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @return The next index to be written.
      */
     final long getNextIndex() {
-        final Indexed<?> lastEntry;
-        return (lastEntry = getLastEntry()) != null ? lastEntry.index() + 1 : firstIndex;
+        return lastEntry != null ? lastEntry.index() + 1 : firstIndex;
     }
 
     /**
@@ -93,7 +100,41 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      *
      * @param index the index to which to reset the head of the segment
      */
-    abstract void reset(long index);
+    final void reset(final long index) {
+        // acquire ownership of cache and make sure reader does not see anything we've done once we're done
+        final var reader = reader();
+        reader.invalidateCache();
+        try {
+            resetWithBuffer(reader, index);
+        } finally {
+            // Make sure reader does not see anything we've done
+            reader.invalidateCache();
+        }
+    }
+
+    abstract JournalSegmentReader<E> reader();
+
+    private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
+        long nextIndex = firstIndex;
+
+        // Clear the buffer indexes and acquire ownership of the buffer
+        currentPosition = JournalSegmentDescriptor.BYTES;
+        reader.setPosition(JournalSegmentDescriptor.BYTES);
+
+        while (index == 0 || nextIndex <= index) {
+            final var entry = reader.readEntry(nextIndex);
+            if (entry == null) {
+                break;
+            }
+
+            lastEntry = entry;
+            this.index.index(nextIndex, currentPosition);
+            nextIndex++;
+
+            // Update the current position for indexing.
+            currentPosition = currentPosition + HEADER_BYTES + entry.size();
+        }
+    }
 
     /**
      * Truncates the log to the given index.
index a9fb5b408834119333cd0644821223db0bbf9ee1..9f437b6125f4c68d8ea126434f1b805da5c9c478 100644 (file)
@@ -45,29 +45,27 @@ import org.eclipse.jdt.annotation.NonNull;
  */
 final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   private final @NonNull MappedByteBuffer mappedBuffer;
+  private final JournalSegmentReader<E> reader;
   private final ByteBuffer buffer;
 
-  private Indexed<E> lastEntry;
-  private int currentPosition;
-
-  MappedJournalSegmentWriter(
-      final FileChannel channel,
-      final JournalSegment<E> segment,
-      final int maxEntrySize,
-      final JournalIndex index,
-      final JournalSerdes namespace) {
+  MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+      final JournalIndex index, final JournalSerdes namespace) {
     super(channel, segment, maxEntrySize, index, namespace);
+
     mappedBuffer = mapBuffer(channel, maxSegmentSize);
     buffer = mappedBuffer.slice();
+    reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+        maxEntrySize, namespace);
     reset(0);
   }
 
-  MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous, final int position) {
+  MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
     super(previous);
+
     mappedBuffer = mapBuffer(channel, maxSegmentSize);
     buffer = mappedBuffer.slice();
-    currentPosition = position;
-    lastEntry = previous.getLastEntry();
+    reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+        maxEntrySize, namespace);
   }
 
   private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
@@ -91,55 +89,12 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   @Override
   DiskJournalSegmentWriter<E> toFileChannel() {
     close();
-    return new DiskJournalSegmentWriter<>(this, currentPosition);
-  }
-
-  @Override
-  void reset(final long index) {
-    long nextIndex = firstIndex;
-
-    // Clear the buffer indexes.
-    currentPosition = JournalSegmentDescriptor.BYTES;
-
-    int length = buffer.getInt(currentPosition);
-
-    // If the length is non-zero, read the entry.
-    while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
-
-      // Read the checksum of the entry.
-      final long checksum = buffer.getInt(currentPosition + Integer.BYTES);
-
-      // Slice off the entry's bytes
-      final var entryBytes = buffer.slice(currentPosition + SegmentEntry.HEADER_BYTES, length);
-
-      // Compute the checksum for the entry bytes.
-      final var crc32 = new CRC32();
-      crc32.update(entryBytes);
-
-      // If the stored checksum does not equal the computed checksum, do not proceed further
-      if (checksum != (int) crc32.getValue()) {
-          break;
-      }
-
-      entryBytes.rewind();
-      final E entry = namespace.deserialize(entryBytes);
-      lastEntry = new Indexed<>(nextIndex, entry, length);
-      this.index.index(nextIndex, currentPosition);
-      nextIndex++;
-
-      // Update the current position for indexing.
-      currentPosition = currentPosition + SegmentEntry.HEADER_BYTES + length;
-
-      if (currentPosition + SegmentEntry.HEADER_BYTES >= maxSegmentSize) {
-          break;
-      }
-      length = buffer.getInt(currentPosition);
-    }
+    return new DiskJournalSegmentWriter<>(this);
   }
 
   @Override
-  Indexed<E> getLastEntry() {
-    return lastEntry;
+  JournalSegmentReader<E> reader() {
+    return reader;
   }
 
   @Override