Rework MappedJournalSegmentWriter position tracking 77/111077/4
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 26 Mar 2024 20:03:48 +0000 (21:03 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 27 Mar 2024 00:15:46 +0000 (01:15 +0100)
Use an internal field to track the position. Also align entry reading
logic by slicing the appropriate buffer.

JIRA: CONTROLLER-2100
Change-Id: Iaa36fb32d9ffb7b215bef7829f8d3e7e5728e4b9
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java

index 99180c5840321165696796a45234f5dcfcbb237e..a9fb5b408834119333cd0644821223db0bbf9ee1 100644 (file)
@@ -22,7 +22,6 @@ import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
@@ -49,27 +48,29 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   private final ByteBuffer buffer;
 
   private Indexed<E> lastEntry;
+  private int currentPosition;
 
   MappedJournalSegmentWriter(
-      FileChannel channel,
-      JournalSegment<E> segment,
-      int maxEntrySize,
-      JournalIndex index,
-      JournalSerdes namespace) {
+      final FileChannel channel,
+      final JournalSegment<E> segment,
+      final int maxEntrySize,
+      final JournalIndex index,
+      final JournalSerdes namespace) {
     super(channel, segment, maxEntrySize, index, namespace);
     mappedBuffer = mapBuffer(channel, maxSegmentSize);
     buffer = mappedBuffer.slice();
     reset(0);
   }
 
-  MappedJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
+  MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous, final int position) {
     super(previous);
     mappedBuffer = mapBuffer(channel, maxSegmentSize);
-    buffer = mappedBuffer.slice().position(position);
+    buffer = mappedBuffer.slice();
+    currentPosition = position;
     lastEntry = previous.getLastEntry();
   }
 
-  private static @NonNull MappedByteBuffer mapBuffer(FileChannel channel, int maxSegmentSize) {
+  private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
     try {
       return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize);
     } catch (IOException e) {
@@ -89,63 +90,50 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
   @Override
   DiskJournalSegmentWriter<E> toFileChannel() {
-    final int position = buffer.position();
     close();
-    return new DiskJournalSegmentWriter<>(this, position);
+    return new DiskJournalSegmentWriter<>(this, currentPosition);
   }
 
   @Override
-  void reset(long index) {
+  void reset(final long index) {
     long nextIndex = firstIndex;
 
     // Clear the buffer indexes.
-    buffer.position(JournalSegmentDescriptor.BYTES);
-
-    // Record the current buffer position.
-    int position = buffer.position();
+    currentPosition = JournalSegmentDescriptor.BYTES;
 
-    // Read the entry length.
-    buffer.mark();
-
-    try {
-      int length = buffer.getInt();
+    int length = buffer.getInt(currentPosition);
 
-      // If the length is non-zero, read the entry.
-      while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
+    // If the length is non-zero, read the entry.
+    while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
 
-        // Read the checksum of the entry.
-        final long checksum = buffer.getInt() & 0xFFFFFFFFL;
+      // Read the checksum of the entry.
+      final long checksum = buffer.getInt(currentPosition + Integer.BYTES);
 
-        // Slice off the entry's bytes
-        final ByteBuffer entryBytes = buffer.slice();
-        entryBytes.limit(length);
+      // Slice off the entry's bytes
+      final var entryBytes = buffer.slice(currentPosition + SegmentEntry.HEADER_BYTES, length);
 
-        // Compute the checksum for the entry bytes.
-        final CRC32 crc32 = new CRC32();
-        crc32.update(entryBytes);
+      // Compute the checksum for the entry bytes.
+      final var crc32 = new CRC32();
+      crc32.update(entryBytes);
 
-        // If the stored checksum does not equal the computed checksum, do not proceed further
-        if (checksum != crc32.getValue()) {
+      // If the stored checksum does not equal the computed checksum, do not proceed further
+      if (checksum != (int) crc32.getValue()) {
           break;
-        }
+      }
 
-        entryBytes.rewind();
-        final E entry = namespace.deserialize(entryBytes);
-        lastEntry = new Indexed<>(nextIndex, entry, length);
-        this.index.index(nextIndex, position);
-        nextIndex++;
+      entryBytes.rewind();
+      final E entry = namespace.deserialize(entryBytes);
+      lastEntry = new Indexed<>(nextIndex, entry, length);
+      this.index.index(nextIndex, currentPosition);
+      nextIndex++;
 
-        // Update the current position for indexing.
-        position = buffer.position() + length;
-        buffer.position(position);
+      // Update the current position for indexing.
+      currentPosition = currentPosition + SegmentEntry.HEADER_BYTES + length;
 
-        length = buffer.mark().getInt();
+      if (currentPosition + SegmentEntry.HEADER_BYTES >= maxSegmentSize) {
+          break;
       }
-
-      // Reset the buffer to the previous mark.
-      buffer.reset();
-    } catch (BufferUnderflowException e) {
-      buffer.reset();
+      length = buffer.getInt(currentPosition);
     }
   }
 
@@ -156,53 +144,50 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
   @Override
   @SuppressWarnings("unchecked")
-  <T extends E> Indexed<T> append(T entry) {
+  <T extends E> Indexed<T> append(final T entry) {
     // Store the entry index.
     final long index = getNextIndex();
 
     // Serialize the entry.
-    int position = buffer.position();
-    if (position + HEADER_BYTES > buffer.limit()) {
+    final int bodyPosition = currentPosition + HEADER_BYTES;
+    final int avail = maxSegmentSize - bodyPosition;
+    if (avail < 0) {
       throw new BufferOverflowException();
     }
 
-    buffer.position(position + HEADER_BYTES);
-
+    final var entryBytes = buffer.slice(bodyPosition, Math.min(avail, maxEntrySize));
     try {
-      namespace.serialize(entry, buffer);
+      namespace.serialize(entry, entryBytes);
     } catch (KryoException e) {
-      throw new BufferOverflowException();
-    }
-
-    final int length = buffer.position() - (position + HEADER_BYTES);
+      if (entryBytes.capacity() != maxEntrySize) {
+        // We have not provided enough capacity, signal to roll to next segment
+        throw new BufferOverflowException();
+      }
 
-    // If the entry length exceeds the maximum entry size then throw an exception.
-    if (length > maxEntrySize) {
       // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
-      buffer.position(position);
-      throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
+      throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
     }
 
+    final int length = entryBytes.position();
+
     // Compute the checksum for the entry.
-    final CRC32 crc32 = new CRC32();
-    buffer.position(position + HEADER_BYTES);
-    ByteBuffer slice = buffer.slice();
-    slice.limit(length);
-    crc32.update(slice);
-    final long checksum = crc32.getValue();
+    final var crc32 = new CRC32();
+    crc32.update(entryBytes.flip());
 
     // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-    buffer.position(position).putInt(length).putInt((int) checksum).position(position + HEADER_BYTES + length);
+    buffer.putInt(currentPosition, length).putInt(currentPosition + Integer.BYTES, (int) crc32.getValue());
 
     // Update the last entry with the correct index/term/length.
     Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
-    this.lastEntry = indexedEntry;
-    this.index.index(index, position);
+    lastEntry = indexedEntry;
+    this.index.index(index, currentPosition);
+
+    currentPosition = currentPosition + HEADER_BYTES + length;
     return (Indexed<T>) indexedEntry;
   }
 
   @Override
-  void truncate(long index) {
+  void truncate(final long index) {
     // If the index is greater than or equal to the last index, skip the truncate.
     if (index >= getLastIndex()) {
       return;
@@ -216,16 +201,15 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
     if (index < firstIndex) {
       // Reset the writer to the first entry.
-      buffer.position(JournalSegmentDescriptor.BYTES);
+      currentPosition = JournalSegmentDescriptor.BYTES;
     } else {
       // Reset the writer to the given index.
       reset(index);
     }
 
     // Zero the entry header at current buffer position.
-    int position = buffer.position();
     // Note: we issue a single putLong() instead of two putInt()s.
-    buffer.putLong(0).position(position);
+    buffer.putLong(currentPosition, 0L);
   }
 
   @Override