Move entry serialization back to ByteBufWriter
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
index 0ffe8bea6c2909cf887d4a2e7f84ddef1e81bfd7..33f596f496604b90a4f88c7f93c803ff097d1452 100644 (file)
@@ -20,8 +20,8 @@ import static java.util.Objects.requireNonNull;
 
 import io.atomix.storage.journal.StorageException.TooLarge;
 import io.atomix.storage.journal.index.JournalIndex;
-import io.atomix.storage.journal.index.Position;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
@@ -71,39 +71,60 @@ final class JournalSegmentWriter {
     /**
      * Tries to append a binary data to the journal.
      *
-     * @param buf binary data to append
-     * @return The index of appended data, or {@code null} if segment has no space
+     * @param mapper the mapper to use
+     * @param entry the entry
+     * @return the entry size, or {@code null} if segment has no space
      */
-    Position append(final ByteBuf buf) {
-        final var length = buf.readableBytes();
-        if (length > maxEntrySize) {
-            throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
-        }
-
-        // Store the entry index.
+    <T> @Nullable Integer append(final ByteBufMapper<T> mapper, final T entry) {
+        // we are appending at this index and position
         final long index = nextIndex();
         final int position = currentPosition;
 
-        // check space available
-        final int nextPosition = position + HEADER_BYTES + length;
-        if (nextPosition >= maxSegmentSize) {
+        // Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are
+        // way smaller than that.
+        final int bodyPosition = position + HEADER_BYTES;
+        final int avail = maxSegmentSize - bodyPosition;
+        if (avail <= 0) {
+            // we do not have enough space for the header and a byte: signal a retry
             LOG.trace("Not enough space for {} at {}", index, position);
             return null;
         }
 
-        // allocate buffer and write data
-        final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
-        writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
+        // Entry must not exceed maxEntrySize
+        final var writeLimit = Math.min(avail, maxEntrySize);
+
+        // Allocate entry space
+        final var diskEntry = fileWriter.startWrite(position, writeLimit + HEADER_BYTES);
+        // Create a ByteBuf covering the bytes. Note we do not use slice(), as Netty will do the equivalent.
+        final var bytes = Unpooled.wrappedBuffer(diskEntry.position(HEADER_BYTES));
+        try {
+            mapper.objectToBytes(entry, bytes);
+        } catch (IOException e) {
+            // We ran out of buffer space: let's decide who's fault it is:
+            if (writeLimit == maxEntrySize) {
+                // - it is the entry and/or mapper. This is not exactly accurate, as there may be other serialization
+                //   fault. This is as good as it gets.
+                throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
+            }
+
+            // - it is us, as we do not have the capacity to hold maxEntrySize bytes
+            LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
+            return null;
+        }
 
-        // Compute the checksum for the entry.
-        final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
+        // Determine length, trim distEntry and compute checksum. We are okay with computeChecksum() consuming
+        // the buffer, as we rewind() it back.
+        final var length = bytes.readableBytes();
+        final var checksum = SegmentEntry.computeChecksum(
+            diskEntry.limit(HEADER_BYTES + length).position(HEADER_BYTES));
 
-        // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-        fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
+        // update the header and commit entry to file
+        fileWriter.commitWrite(position, diskEntry.rewind().putInt(0, length).putInt(Integer.BYTES, checksum));
 
         // Update the last entry with the correct index/term/length.
-        currentPosition = nextPosition;
-        return journalIndex.index(index, position);
+        currentPosition = bodyPosition + length;
+        journalIndex.index(index, position);
+        return length;
     }
 
     /**