Move entry serialization back to ByteBufWriter 51/111651/12
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 7 May 2024 20:29:31 +0000 (22:29 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 8 May 2024 23:41:09 +0000 (23:41 +0000)
I8f6bac3192a0f38b627150be4c8ea128f1e233e5 moved serialization to heap,
causing unnecessary copies, while nominally simplifying the interface.

This patch undoes that move, restoring the logic, except working on top
of a ByteBuf. This requires a bit more logic to deal with the fact we
are no longer writing to the diskEntry nor are we flipping it.

JIRA: CONTROLLER-2115
Change-Id: I1d18f99cfdb5b7e6c6548a5833c824af9f31c166
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java

index cabd48d8bd7d8d428f91ad8d388ca869dd837d2f..a0f6f804494f0e679ac7ebd7a0ccb66d8f5933aa 100644 (file)
 package io.atomix.storage.journal;
 
 import io.netty.buffer.ByteBuf;
+import java.io.IOException;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 
 /**
- * Support for serialization of {@link ByteBufJournal} entries.
+ * Support for mapping of {@link ByteBufJournal} entries to and from {@link ByteBuf}s.
  */
 @NonNullByDefault
 public interface ByteBufMapper<T> {
     /**
-     * Converts an object into a series of bytes in a {@link ByteBuf}.
+     * Converts the contents of a {@link ByteBuf} to an object.
      *
-     * @param obj the object
-     * @return resulting buffer
+     * @param index entry index
+     * @param bytes entry bytes
+     * @return resulting object
      */
-    ByteBuf objectToBytes(T obj) ;
+    T bytesToObject(final long index, ByteBuf bytes);
 
     /**
-     * Converts the contents of a {@link ByteBuf} to an object.
+     * Converts an object into a series of bytes in the specified {@link ByteBuf}.
      *
-     * @param buf buffer to convert
-     * @return resulting object
+     * @param obj the object
+     * @param buf target buffer
+     * @throws IOException if an I/O error occurs
      */
-    T bytesToObject(ByteBuf buf);
+    void objectToBytes(T obj, ByteBuf buf) throws IOException;
 }
index 910759cd17ab565d5bd116d1744c2d5aa7164c46..c092f20d50ea4adddce40871775deb2ae0720934 100644 (file)
@@ -15,7 +15,6 @@
  */
 package io.atomix.storage.journal;
 
-import io.netty.buffer.ByteBuf;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 
 /**
@@ -33,11 +32,12 @@ public interface ByteBufWriter {
     /**
      * Appends an entry to the journal.
      *
-     * @param bytes Data block to append
-     * @return The index of appended data block
+     * @param mapper a {@link ByteBufMapper} to use with entry
+     * @param entry entry to append
+     * @return the on-disk size of the entry
      */
     // FIXME: throws IOException
-    long append(ByteBuf bytes);
+    <T> int append(ByteBufMapper<T> mapper, T entry);
 
     /**
      * Commits entries up to the given index.
index a677fc3d26e75d2e0bbfbd7d793ca3dab77dc929..fc9ef64fe3097eb87e194ab4f1e03611ba96de79 100644 (file)
@@ -48,6 +48,14 @@ abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter {
      */
     abstract void writeEmptyHeader(int position);
 
+    /**
+     * Allocate file space. Note that the allocated space may be a buffer disconnected from the file. Any modifications
+     * to the returned buffer need to be committed via {@link #commitWrite(int, ByteBuffer)}.
+     *
+     * @param position position to start from
+     * @param size the size to allocate
+     * @return A {@link ByteBuffer} covering the allocated area
+     */
     abstract ByteBuffer startWrite(int position, int size);
 
     abstract void commitWrite(int position, ByteBuffer entry);
index 0ffe8bea6c2909cf887d4a2e7f84ddef1e81bfd7..33f596f496604b90a4f88c7f93c803ff097d1452 100644 (file)
@@ -20,8 +20,8 @@ import static java.util.Objects.requireNonNull;
 
 import io.atomix.storage.journal.StorageException.TooLarge;
 import io.atomix.storage.journal.index.JournalIndex;
-import io.atomix.storage.journal.index.Position;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
@@ -71,39 +71,60 @@ final class JournalSegmentWriter {
     /**
      * Tries to append a binary data to the journal.
      *
-     * @param buf binary data to append
-     * @return The index of appended data, or {@code null} if segment has no space
+     * @param mapper the mapper to use
+     * @param entry the entry
+     * @return the entry size, or {@code null} if segment has no space
      */
-    Position append(final ByteBuf buf) {
-        final var length = buf.readableBytes();
-        if (length > maxEntrySize) {
-            throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
-        }
-
-        // Store the entry index.
+    <T> @Nullable Integer append(final ByteBufMapper<T> mapper, final T entry) {
+        // we are appending at this index and position
         final long index = nextIndex();
         final int position = currentPosition;
 
-        // check space available
-        final int nextPosition = position + HEADER_BYTES + length;
-        if (nextPosition >= maxSegmentSize) {
+        // Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are
+        // way smaller than that.
+        final int bodyPosition = position + HEADER_BYTES;
+        final int avail = maxSegmentSize - bodyPosition;
+        if (avail <= 0) {
+            // we do not have enough space for the header and a byte: signal a retry
             LOG.trace("Not enough space for {} at {}", index, position);
             return null;
         }
 
-        // allocate buffer and write data
-        final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
-        writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
+        // Entry must not exceed maxEntrySize
+        final var writeLimit = Math.min(avail, maxEntrySize);
+
+        // Allocate entry space
+        final var diskEntry = fileWriter.startWrite(position, writeLimit + HEADER_BYTES);
+        // Create a ByteBuf covering the bytes. Note we do not use slice(), as Netty will do the equivalent.
+        final var bytes = Unpooled.wrappedBuffer(diskEntry.position(HEADER_BYTES));
+        try {
+            mapper.objectToBytes(entry, bytes);
+        } catch (IOException e) {
+            // We ran out of buffer space: let's decide who's fault it is:
+            if (writeLimit == maxEntrySize) {
+                // - it is the entry and/or mapper. This is not exactly accurate, as there may be other serialization
+                //   fault. This is as good as it gets.
+                throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
+            }
+
+            // - it is us, as we do not have the capacity to hold maxEntrySize bytes
+            LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
+            return null;
+        }
 
-        // Compute the checksum for the entry.
-        final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
+        // Determine length, trim distEntry and compute checksum. We are okay with computeChecksum() consuming
+        // the buffer, as we rewind() it back.
+        final var length = bytes.readableBytes();
+        final var checksum = SegmentEntry.computeChecksum(
+            diskEntry.limit(HEADER_BYTES + length).position(HEADER_BYTES));
 
-        // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-        fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
+        // update the header and commit entry to file
+        fileWriter.commitWrite(position, diskEntry.rewind().putInt(0, length).putInt(Integer.BYTES, checksum));
 
         // Update the last entry with the correct index/term/length.
-        currentPosition = nextPosition;
-        return journalIndex.index(index, position);
+        currentPosition = bodyPosition + length;
+        journalIndex.index(index, position);
+        return length;
     }
 
     /**
index 29b5bed7ab6ac75c570ca1187e614463e2680613..bf8518de2f8462bc77e83bd83ac64e1cd80e2e27 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import com.esotericsoftware.kryo.KryoException;
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import io.atomix.utils.serializer.KryoJournalSerdesBuilder;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -120,13 +120,21 @@ public interface JournalSerdes {
     default <T> ByteBufMapper<T> toMapper() {
         return new ByteBufMapper<>() {
             @Override
-            public ByteBuf objectToBytes(final T obj) {
-                return Unpooled.wrappedBuffer(serialize(obj));
+            public void objectToBytes(final T obj, final ByteBuf bytes) throws IOException {
+                final var buffer = bytes.nioBuffer();
+                try {
+                    serialize(obj, buffer);
+                } catch (KryoException e) {
+                    throw new IOException(e);
+                } finally {
+                    // adjust writerIndex so that readableBytes() the bytes written
+                    bytes.writerIndex(bytes.readerIndex() + buffer.position());
+                }
             }
 
             @Override
-            public T bytesToObject(final ByteBuf buf) {
-                return deserialize(buf.nioBuffer());
+            public T bytesToObject(final long index, final ByteBuf bytes) {
+                return deserialize(bytes.nioBuffer());
             }
         };
     }
index 75ecc26afd0b494732c067bf11cac24ce799ada8..103d5cf7632365f1a4d5e032471394a016d12339 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
 
-import io.netty.buffer.ByteBuf;
-
 /**
  * A {@link ByteBufWriter} implementation.
  */
@@ -51,18 +50,18 @@ final class SegmentedByteBufWriter implements ByteBufWriter {
     }
 
     @Override
-    public long append(final ByteBuf bytes) {
-        final var position = currentWriter.append(bytes);
-        return position != null ? position.index() : appendToNextSegment(bytes);
+    public <T> int append(final ByteBufMapper<T> mapper, final T entry) {
+        final var size = currentWriter.append(mapper, entry);
+        return size != null ? size : appendToNextSegment(mapper, entry);
     }
 
     //  Slow path: we do not have enough capacity
-    private long appendToNextSegment(final ByteBuf bytes) {
+    private <T> int appendToNextSegment(final ByteBufMapper<T> mapper, final T entry) {
         currentWriter.flush();
         currentSegment.releaseWriter();
         currentSegment = journal.createNextSegment();
         currentWriter = currentSegment.acquireWriter();
-        return currentWriter.append(bytes).index();
+        return verifyNotNull(currentWriter.append(mapper, entry));
     }
 
     @Override
index f28390c84b9f592d4098996124eb8b2e98ca0e5e..db0531acec56350076a538434c90e9f917e859aa 100644 (file)
@@ -18,11 +18,13 @@ package io.atomix.storage.journal;
 
 import static java.util.Objects.requireNonNull;
 
+import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 
 /**
  * A {@link JournalReader} backed by a {@link ByteBufReader}.
  */
+@NonNullByDefault
 final class SegmentedJournalReader<E> implements JournalReader<E> {
     private final ByteBufMapper<E> mapper;
     private final ByteBufReader reader;
@@ -54,9 +56,10 @@ final class SegmentedJournalReader<E> implements JournalReader<E> {
 
     @Override
     public <T> @Nullable T tryNext(final EntryMapper<E, T> entryMapper) {
-        return reader.tryNext(
-            (index, buf) -> requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(buf), buf.readableBytes()))
-        );
+        return reader.tryNext((index, buf) -> {
+            final var size = buf.readableBytes();
+            return requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(index, buf), size));
+        });
     }
 
     @Override
index a5e0737940da27faca0633d62dffc97de58f87e6..144babce761def9982ef9fe75d63305cf0b8cb98 100644 (file)
@@ -18,9 +18,12 @@ package io.atomix.storage.journal;
 
 import static java.util.Objects.requireNonNull;
 
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
 /**
  * A {@link JournalWriter} backed by a {@link ByteBufWriter}.
  */
+@NonNullByDefault
 final class SegmentedJournalWriter<E> implements JournalWriter<E> {
     private final ByteBufMapper<E> mapper;
     private final ByteBufWriter writer;
@@ -42,8 +45,8 @@ final class SegmentedJournalWriter<E> implements JournalWriter<E> {
 
     @Override
     public <T extends E> Indexed<T> append(final T entry) {
-        final var buf = mapper.objectToBytes(entry);
-        return new Indexed<>(writer.append(buf), entry, buf.readableBytes());
+        final var index = writer.nextIndex();
+        return new Indexed<>(index, entry, writer.append(mapper, entry));
     }
 
     @Override