Centralize JournalSegmentWriter.append() 81/111081/3
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 26 Mar 2024 23:04:35 +0000 (00:04 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 27 Mar 2024 00:15:46 +0000 (01:15 +0100)
We have almost-identical implementations of append(), differing only
slightly in how the do buffer management.

Expose a ByteBuffer-based API to writing to segment file, in terms of
abstract methods and pull the implementation down into the superclass.

JIRA: CONTROLLER-2100
Change-Id: I8d5629019d1c5d2978b0861e30e80346fbfc3c31
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java

index d3aa3332c7a56eddd71c4f36193398e386d78323..266320127b214056942380d9d5f9a73437242557 100644 (file)
@@ -18,15 +18,11 @@ package io.atomix.storage.journal;
 
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 
-import com.esotericsoftware.kryo.KryoException;
-import io.atomix.storage.journal.StorageException.TooLarge;
 import io.atomix.storage.journal.index.JournalIndex;
 import java.io.IOException;
-import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.zip.CRC32;
 
 /**
  * Segment writer.
@@ -44,117 +40,85 @@ import java.util.zip.CRC32;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
-  private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
-
-  private final JournalSegmentReader<E> reader;
-  private final ByteBuffer buffer;
-
-  DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
-          final JournalIndex index, final JournalSerdes namespace) {
-    super(channel, segment, maxEntrySize, index, namespace);
-
-    buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
-    reader = new JournalSegmentReader<>(segment,
-        new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
-    reset(0);
-  }
-
-  DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
-    super(previous);
-
-    buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
-    reader = new JournalSegmentReader<>(segment,
-        new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
-  }
-
-  @Override
-  MappedByteBuffer buffer() {
-    return null;
-  }
-
-  @Override
-  MappedJournalSegmentWriter<E> toMapped() {
-    return new MappedJournalSegmentWriter<>(this);
-  }
-
-  @Override
-  DiskJournalSegmentWriter<E> toFileChannel() {
-    return this;
-  }
-
-  @Override
-  JournalSegmentReader<E> reader() {
-    return reader;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  <T extends E> Indexed<T> append(final T entry) {
-      // Store the entry index.
-      final long index = getNextIndex();
-
-      // Serialize the entry.
-      try {
-          namespace.serialize(entry, buffer.clear().position(HEADER_BYTES));
-      } catch (KryoException e) {
-          throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
-      }
-      buffer.flip();
-
-      final int length = buffer.limit() - HEADER_BYTES;
-      // Ensure there's enough space left in the buffer to store the entry.
-      if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
-          throw new BufferOverflowException();
-      }
-
-      // If the entry length exceeds the maximum entry size then throw an exception.
-      if (length > maxEntrySize) {
-          throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
-      }
-
-      // Compute the checksum for the entry.
-      final var crc32 = new CRC32();
-      crc32.update(buffer.slice(HEADER_BYTES, length));
-
-      // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-      buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
-      try {
-          channel.write(buffer, currentPosition);
-      } catch (IOException e) {
-          throw new StorageException(e);
-      }
-
-      // Update the last entry with the correct index/term/length.
-      final var indexedEntry = new Indexed<E>(index, entry, length);
-      lastEntry = indexedEntry;
-      this.index.index(index, currentPosition);
-
-      currentPosition = currentPosition + HEADER_BYTES + length;
-      return (Indexed<T>) indexedEntry;
-  }
-
-  @Override
-  void writeEmptyHeader(final int position) {
-    try {
-      channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
-    } catch (IOException e) {
-      throw new StorageException(e);
+    private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
+
+    private final JournalSegmentReader<E> reader;
+    private final ByteBuffer buffer;
+
+    DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+        final JournalIndex index, final JournalSerdes namespace) {
+        super(channel, segment, maxEntrySize, index, namespace);
+
+        buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+        reader = new JournalSegmentReader<>(segment,
+            new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+        reset(0);
+    }
+
+    DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+        super(previous);
+
+        buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+        reader = new JournalSegmentReader<>(segment,
+            new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+    }
+
+    @Override
+    MappedByteBuffer buffer() {
+        return null;
+    }
+
+    @Override
+    MappedJournalSegmentWriter<E> toMapped() {
+        return new MappedJournalSegmentWriter<>(this);
+    }
+
+    @Override
+    DiskJournalSegmentWriter<E> toFileChannel() {
+        return this;
+    }
+
+    @Override
+    JournalSegmentReader<E> reader() {
+        return reader;
     }
-  }
-
-  @Override
-  void flush() {
-    try {
-      if (channel.isOpen()) {
-        channel.force(true);
-      }
-    } catch (IOException e) {
-      throw new StorageException(e);
+
+    @Override
+    ByteBuffer startWrite(final int position, final int size) {
+        return buffer.clear().slice(0, size);
+    }
+
+    @Override
+    void commitWrite(final int position, final ByteBuffer entry) {
+        try {
+            channel.write(entry, position);
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
     }
-  }
 
-  @Override
-  void close() {
-    flush();
-  }
+    @Override
+    void writeEmptyHeader(final int position) {
+        try {
+            channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    void flush() {
+        try {
+            if (channel.isOpen()) {
+                channel.force(true);
+            }
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    void close() {
+        flush();
+    }
 }
index af633a248ffdc94845515770d68aeb87fe4550e8..df8ca35068b53cafd3ca96083c4ef71135d9ff63 100644 (file)
@@ -18,23 +18,26 @@ package io.atomix.storage.journal;
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 import static java.util.Objects.requireNonNull;
 
+import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 
 abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
     final @NonNull FileChannel channel;
     final @NonNull JournalSegment<E> segment;
-    final @NonNull JournalIndex index;
+    private final @NonNull JournalIndex index;
     final @NonNull JournalSerdes namespace;
     final int maxSegmentSize;
     final int maxEntrySize;
 
-    // FIXME: hide these two fields
-    Indexed<E> lastEntry;
-    int currentPosition;
+    private Indexed<E> lastEntry;
+    private int currentPosition;
 
     JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
             final JournalIndex index, final JournalSerdes namespace) {
@@ -90,7 +93,57 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @param entry The entry to append.
      * @return The appended indexed entry.
      */
-    abstract <T extends E> Indexed<T> append(T entry);
+    final <T extends E> Indexed<T> append(final T entry) {
+        // Store the entry index.
+        final long index = getNextIndex();
+        final int position = currentPosition;
+
+        // Serialize the entry.
+        final int bodyPosition = position + HEADER_BYTES;
+        final int avail = maxSegmentSize - bodyPosition;
+        if (avail < 0) {
+          throw new BufferOverflowException();
+        }
+
+        final var writeLimit = Math.min(avail, maxEntrySize);
+        final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
+        try {
+            namespace.serialize(entry, diskEntry);
+        } catch (KryoException e) {
+            if (writeLimit != maxEntrySize) {
+                // We have not provided enough capacity, signal to roll to next segment
+                throw new BufferOverflowException();
+            }
+
+            // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
+            throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+        }
+
+        final int length = diskEntry.position() - HEADER_BYTES;
+
+        // Compute the checksum for the entry.
+        final var crc32 = new CRC32();
+        crc32.update(diskEntry.flip().position(HEADER_BYTES));
+
+        // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
+        diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+        commitWrite(position, diskEntry.rewind());
+
+        // Update the last entry with the correct index/term/length.
+        final var indexedEntry = new Indexed<E>(index, entry, length);
+        lastEntry = indexedEntry;
+        this.index.index(index, position);
+
+        currentPosition = bodyPosition + length;
+
+        @SuppressWarnings("unchecked")
+        final var ugly = (Indexed<T>) indexedEntry;
+        return ugly;
+    }
+
+    abstract ByteBuffer startWrite(int position, int size);
+
+    abstract void commitWrite(int position, ByteBuffer entry);
 
     /**
      * Resets the head of the segment to the given index.
@@ -141,7 +194,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
     final void truncate(final long index) {
         // If the index is greater than or equal to the last index, skip the truncate.
         if (index >= getLastIndex()) {
-          return;
+            return;
         }
 
         // Reset the last entry.
@@ -151,11 +204,11 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         this.index.truncate(index);
 
         if (index < segment.firstIndex()) {
-          // Reset the writer to the first entry.
-          currentPosition = JournalSegmentDescriptor.BYTES;
+            // Reset the writer to the first entry.
+            currentPosition = JournalSegmentDescriptor.BYTES;
         } else {
-          // Reset the writer to the given index.
-          reset(index);
+            // Reset the writer to the given index.
+            reset(index);
         }
 
         // Zero the entry header at current channel position.
index ffb08d29e1ba7f4788088e13b18d849ac5294705..00dd4c6cecab4bc20616d1823ecd5dba1571874b 100644 (file)
  */
 package io.atomix.storage.journal;
 
-import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
-
-import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
 import java.io.IOException;
-import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.NonNull;
 
 /**
@@ -44,121 +39,87 @@ import org.eclipse.jdt.annotation.NonNull;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
-  private final @NonNull MappedByteBuffer mappedBuffer;
-  private final JournalSegmentReader<E> reader;
-  private final ByteBuffer buffer;
-
-  MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
-      final JournalIndex index, final JournalSerdes namespace) {
-    super(channel, segment, maxEntrySize, index, namespace);
-
-    mappedBuffer = mapBuffer(channel, maxSegmentSize);
-    buffer = mappedBuffer.slice();
-    reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
-        maxEntrySize, namespace);
-    reset(0);
-  }
-
-  MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
-    super(previous);
-
-    mappedBuffer = mapBuffer(channel, maxSegmentSize);
-    buffer = mappedBuffer.slice();
-    reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
-        maxEntrySize, namespace);
-  }
-
-  private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
-    try {
-      return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize);
-    } catch (IOException e) {
-      throw new StorageException(e);
+    private final @NonNull MappedByteBuffer mappedBuffer;
+    private final JournalSegmentReader<E> reader;
+    private final ByteBuffer buffer;
+
+    MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+        final JournalIndex index, final JournalSerdes namespace) {
+        super(channel, segment, maxEntrySize, index, namespace);
+
+        mappedBuffer = mapBuffer(channel, maxSegmentSize);
+        buffer = mappedBuffer.slice();
+        reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+            maxEntrySize, namespace);
+        reset(0);
+    }
+
+    MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+        super(previous);
+
+        mappedBuffer = mapBuffer(channel, maxSegmentSize);
+        buffer = mappedBuffer.slice();
+        reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+            maxEntrySize, namespace);
+    }
+
+    private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
+        try {
+            return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize);
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    @NonNull MappedByteBuffer buffer() {
+        return mappedBuffer;
+    }
+
+    @Override
+    MappedJournalSegmentWriter<E> toMapped() {
+        return this;
     }
-  }
-
-  @Override
-  @NonNull MappedByteBuffer buffer() {
-    return mappedBuffer;
-  }
-
-  @Override
-  MappedJournalSegmentWriter<E> toMapped() {
-    return this;
-  }
-
-  @Override
-  DiskJournalSegmentWriter<E> toFileChannel() {
-    close();
-    return new DiskJournalSegmentWriter<>(this);
-  }
-
-  @Override
-  JournalSegmentReader<E> reader() {
-    return reader;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  <T extends E> Indexed<T> append(final T entry) {
-    // Store the entry index.
-    final long index = getNextIndex();
-
-    // Serialize the entry.
-    final int bodyPosition = currentPosition + HEADER_BYTES;
-    final int avail = maxSegmentSize - bodyPosition;
-    if (avail < 0) {
-      throw new BufferOverflowException();
+
+    @Override
+    DiskJournalSegmentWriter<E> toFileChannel() {
+        close();
+        return new DiskJournalSegmentWriter<>(this);
+    }
+
+    @Override
+    JournalSegmentReader<E> reader() {
+        return reader;
+    }
+
+    @Override
+    ByteBuffer startWrite(final int position, final int size) {
+        return buffer.slice(position, size);
+    }
+
+    @Override
+    void commitWrite(final int position, final ByteBuffer entry) {
+        // No-op, buffer is write-through
+    }
+
+    @Override
+    void writeEmptyHeader(final int position) {
+        // Note: we issue a single putLong() instead of two putInt()s.
+        buffer.putLong(position, 0L);
     }
 
-    final var entryBytes = buffer.slice(bodyPosition, Math.min(avail, maxEntrySize));
-    try {
-      namespace.serialize(entry, entryBytes);
-    } catch (KryoException e) {
-      if (entryBytes.capacity() != maxEntrySize) {
-        // We have not provided enough capacity, signal to roll to next segment
-        throw new BufferOverflowException();
-      }
-
-      // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
-      throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+    @Override
+    void flush() {
+        mappedBuffer.force();
     }
 
-    final int length = entryBytes.position();
-
-    // Compute the checksum for the entry.
-    final var crc32 = new CRC32();
-    crc32.update(entryBytes.flip());
-
-    // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-    buffer.putInt(currentPosition, length).putInt(currentPosition + Integer.BYTES, (int) crc32.getValue());
-
-    // Update the last entry with the correct index/term/length.
-    Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
-    lastEntry = indexedEntry;
-    this.index.index(index, currentPosition);
-
-    currentPosition = currentPosition + HEADER_BYTES + length;
-    return (Indexed<T>) indexedEntry;
-  }
-
-  @Override
-  void writeEmptyHeader(final int position) {
-      // Note: we issue a single putLong() instead of two putInt()s.
-      buffer.putLong(position, 0L);
-  }
-
-  @Override
-  void flush() {
-    mappedBuffer.force();
-  }
-
-  @Override
-  void close() {
-    flush();
-    try {
-      BufferCleaner.freeBuffer(mappedBuffer);
-    } catch (IOException e) {
-      throw new StorageException(e);
+    @Override
+    void close() {
+        flush();
+        try {
+            BufferCleaner.freeBuffer(mappedBuffer);
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
     }
-  }
 }