Centralize JournalSegmentWriter.append()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
index af633a248ffdc94845515770d68aeb87fe4550e8..df8ca35068b53cafd3ca96083c4ef71135d9ff63 100644 (file)
@@ -18,23 +18,26 @@ package io.atomix.storage.journal;
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 import static java.util.Objects.requireNonNull;
 
+import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 
 abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
     final @NonNull FileChannel channel;
     final @NonNull JournalSegment<E> segment;
-    final @NonNull JournalIndex index;
+    private final @NonNull JournalIndex index;
     final @NonNull JournalSerdes namespace;
     final int maxSegmentSize;
     final int maxEntrySize;
 
-    // FIXME: hide these two fields
-    Indexed<E> lastEntry;
-    int currentPosition;
+    private Indexed<E> lastEntry;
+    private int currentPosition;
 
     JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
             final JournalIndex index, final JournalSerdes namespace) {
@@ -90,7 +93,57 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @param entry The entry to append.
      * @return The appended indexed entry.
      */
-    abstract <T extends E> Indexed<T> append(T entry);
+    final <T extends E> Indexed<T> append(final T entry) {
+        // Store the entry index.
+        final long index = getNextIndex();
+        final int position = currentPosition;
+
+        // Serialize the entry.
+        final int bodyPosition = position + HEADER_BYTES;
+        final int avail = maxSegmentSize - bodyPosition;
+        if (avail < 0) {
+          throw new BufferOverflowException();
+        }
+
+        final var writeLimit = Math.min(avail, maxEntrySize);
+        final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
+        try {
+            namespace.serialize(entry, diskEntry);
+        } catch (KryoException e) {
+            if (writeLimit != maxEntrySize) {
+                // We have not provided enough capacity, signal to roll to next segment
+                throw new BufferOverflowException();
+            }
+
+            // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
+            throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+        }
+
+        final int length = diskEntry.position() - HEADER_BYTES;
+
+        // Compute the checksum for the entry.
+        final var crc32 = new CRC32();
+        crc32.update(diskEntry.flip().position(HEADER_BYTES));
+
+        // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
+        diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+        commitWrite(position, diskEntry.rewind());
+
+        // Update the last entry with the correct index/term/length.
+        final var indexedEntry = new Indexed<E>(index, entry, length);
+        lastEntry = indexedEntry;
+        this.index.index(index, position);
+
+        currentPosition = bodyPosition + length;
+
+        @SuppressWarnings("unchecked")
+        final var ugly = (Indexed<T>) indexedEntry;
+        return ugly;
+    }
+
+    abstract ByteBuffer startWrite(int position, int size);
+
+    abstract void commitWrite(int position, ByteBuffer entry);
 
     /**
      * Resets the head of the segment to the given index.
@@ -141,7 +194,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
     final void truncate(final long index) {
         // If the index is greater than or equal to the last index, skip the truncate.
         if (index >= getLastIndex()) {
-          return;
+            return;
         }
 
         // Reset the last entry.
@@ -151,11 +204,11 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         this.index.truncate(index);
 
         if (index < segment.firstIndex()) {
-          // Reset the writer to the first entry.
-          currentPosition = JournalSegmentDescriptor.BYTES;
+            // Reset the writer to the first entry.
+            currentPosition = JournalSegmentDescriptor.BYTES;
         } else {
-          // Reset the writer to the given index.
-          reset(index);
+            // Reset the writer to the given index.
+            reset(index);
         }
 
         // Zero the entry header at current channel position.