From 527547aa84eec5aabd13c8a740f786996493eb40 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 May 2024 22:29:31 +0200 Subject: [PATCH] Move entry serialization back to ByteBufWriter I8f6bac3192a0f38b627150be4c8ea128f1e233e5 moved serialization to heap, causing unnecessary copies, while nominally simplifying the interface. This patch undoes that move, restoring the logic, except working on top of a ByteBuf. This requires a bit more logic to deal with the fact we are no longer writing to the diskEntry nor are we flipping it. JIRA: CONTROLLER-2115 Change-Id: I1d18f99cfdb5b7e6c6548a5833c824af9f31c166 Signed-off-by: Robert Varga --- .../atomix/storage/journal/ByteBufMapper.java | 21 +++--- .../atomix/storage/journal/ByteBufWriter.java | 8 +-- .../io/atomix/storage/journal/FileWriter.java | 8 +++ .../storage/journal/JournalSegmentWriter.java | 67 ++++++++++++------- .../atomix/storage/journal/JournalSerdes.java | 18 +++-- .../journal/SegmentedByteBufWriter.java | 13 ++-- .../journal/SegmentedJournalReader.java | 9 ++- .../journal/SegmentedJournalWriter.java | 7 +- 8 files changed, 98 insertions(+), 53 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java index cabd48d8bd..a0f6f80449 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java @@ -16,26 +16,29 @@ package io.atomix.storage.journal; import io.netty.buffer.ByteBuf; +import java.io.IOException; import org.eclipse.jdt.annotation.NonNullByDefault; /** - * Support for serialization of {@link ByteBufJournal} entries. + * Support for mapping of {@link ByteBufJournal} entries to and from {@link ByteBuf}s. */ @NonNullByDefault public interface ByteBufMapper { /** - * Converts an object into a series of bytes in a {@link ByteBuf}. + * Converts the contents of a {@link ByteBuf} to an object. * - * @param obj the object - * @return resulting buffer + * @param index entry index + * @param bytes entry bytes + * @return resulting object */ - ByteBuf objectToBytes(T obj) ; + T bytesToObject(final long index, ByteBuf bytes); /** - * Converts the contents of a {@link ByteBuf} to an object. + * Converts an object into a series of bytes in the specified {@link ByteBuf}. * - * @param buf buffer to convert - * @return resulting object + * @param obj the object + * @param buf target buffer + * @throws IOException if an I/O error occurs */ - T bytesToObject(ByteBuf buf); + void objectToBytes(T obj, ByteBuf buf) throws IOException; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java index 910759cd17..c092f20d50 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java @@ -15,7 +15,6 @@ */ package io.atomix.storage.journal; -import io.netty.buffer.ByteBuf; import org.eclipse.jdt.annotation.NonNullByDefault; /** @@ -33,11 +32,12 @@ public interface ByteBufWriter { /** * Appends an entry to the journal. * - * @param bytes Data block to append - * @return The index of appended data block + * @param mapper a {@link ByteBufMapper} to use with entry + * @param entry entry to append + * @return the on-disk size of the entry */ // FIXME: throws IOException - long append(ByteBuf bytes); + int append(ByteBufMapper mapper, T entry); /** * Commits entries up to the given index. diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java index a677fc3d26..fc9ef64fe3 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java @@ -48,6 +48,14 @@ abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter { */ abstract void writeEmptyHeader(int position); + /** + * Allocate file space. Note that the allocated space may be a buffer disconnected from the file. Any modifications + * to the returned buffer need to be committed via {@link #commitWrite(int, ByteBuffer)}. + * + * @param position position to start from + * @param size the size to allocate + * @return A {@link ByteBuffer} covering the allocated area + */ abstract ByteBuffer startWrite(int position, int size); abstract void commitWrite(int position, ByteBuffer entry); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index 0ffe8bea6c..33f596f496 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -20,8 +20,8 @@ import static java.util.Objects.requireNonNull; import io.atomix.storage.journal.StorageException.TooLarge; import io.atomix.storage.journal.index.JournalIndex; -import io.atomix.storage.journal.index.Position; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.IOException; import java.nio.MappedByteBuffer; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; @@ -71,39 +71,60 @@ final class JournalSegmentWriter { /** * Tries to append a binary data to the journal. * - * @param buf binary data to append - * @return The index of appended data, or {@code null} if segment has no space + * @param mapper the mapper to use + * @param entry the entry + * @return the entry size, or {@code null} if segment has no space */ - Position append(final ByteBuf buf) { - final var length = buf.readableBytes(); - if (length > maxEntrySize) { - throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } - - // Store the entry index. + @Nullable Integer append(final ByteBufMapper mapper, final T entry) { + // we are appending at this index and position final long index = nextIndex(); final int position = currentPosition; - // check space available - final int nextPosition = position + HEADER_BYTES + length; - if (nextPosition >= maxSegmentSize) { + // Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are + // way smaller than that. + final int bodyPosition = position + HEADER_BYTES; + final int avail = maxSegmentSize - bodyPosition; + if (avail <= 0) { + // we do not have enough space for the header and a byte: signal a retry LOG.trace("Not enough space for {} at {}", index, position); return null; } - // allocate buffer and write data - final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES); - writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length); + // Entry must not exceed maxEntrySize + final var writeLimit = Math.min(avail, maxEntrySize); + + // Allocate entry space + final var diskEntry = fileWriter.startWrite(position, writeLimit + HEADER_BYTES); + // Create a ByteBuf covering the bytes. Note we do not use slice(), as Netty will do the equivalent. + final var bytes = Unpooled.wrappedBuffer(diskEntry.position(HEADER_BYTES)); + try { + mapper.objectToBytes(entry, bytes); + } catch (IOException e) { + // We ran out of buffer space: let's decide who's fault it is: + if (writeLimit == maxEntrySize) { + // - it is the entry and/or mapper. This is not exactly accurate, as there may be other serialization + // fault. This is as good as it gets. + throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e); + } + + // - it is us, as we do not have the capacity to hold maxEntrySize bytes + LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e); + return null; + } - // Compute the checksum for the entry. - final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length)); + // Determine length, trim distEntry and compute checksum. We are okay with computeChecksum() consuming + // the buffer, as we rewind() it back. + final var length = bytes.readableBytes(); + final var checksum = SegmentEntry.computeChecksum( + diskEntry.limit(HEADER_BYTES + length).position(HEADER_BYTES)); - // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum)); + // update the header and commit entry to file + fileWriter.commitWrite(position, diskEntry.rewind().putInt(0, length).putInt(Integer.BYTES, checksum)); // Update the last entry with the correct index/term/length. - currentPosition = nextPosition; - return journalIndex.index(index, position); + currentPosition = bodyPosition + length; + journalIndex.index(index, position); + return length; } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java index 29b5bed7ab..bf8518de2f 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java @@ -16,11 +16,11 @@ */ package io.atomix.storage.journal; +import com.esotericsoftware.kryo.KryoException; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; import io.atomix.utils.serializer.KryoJournalSerdesBuilder; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -120,13 +120,21 @@ public interface JournalSerdes { default ByteBufMapper toMapper() { return new ByteBufMapper<>() { @Override - public ByteBuf objectToBytes(final T obj) { - return Unpooled.wrappedBuffer(serialize(obj)); + public void objectToBytes(final T obj, final ByteBuf bytes) throws IOException { + final var buffer = bytes.nioBuffer(); + try { + serialize(obj, buffer); + } catch (KryoException e) { + throw new IOException(e); + } finally { + // adjust writerIndex so that readableBytes() the bytes written + bytes.writerIndex(bytes.readerIndex() + buffer.position()); + } } @Override - public T bytesToObject(final ByteBuf buf) { - return deserialize(buf.nioBuffer()); + public T bytesToObject(final long index, final ByteBuf bytes) { + return deserialize(bytes.nioBuffer()); } }; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java index 75ecc26afd..103d5cf763 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -16,10 +16,9 @@ */ package io.atomix.storage.journal; +import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; -import io.netty.buffer.ByteBuf; - /** * A {@link ByteBufWriter} implementation. */ @@ -51,18 +50,18 @@ final class SegmentedByteBufWriter implements ByteBufWriter { } @Override - public long append(final ByteBuf bytes) { - final var position = currentWriter.append(bytes); - return position != null ? position.index() : appendToNextSegment(bytes); + public int append(final ByteBufMapper mapper, final T entry) { + final var size = currentWriter.append(mapper, entry); + return size != null ? size : appendToNextSegment(mapper, entry); } // Slow path: we do not have enough capacity - private long appendToNextSegment(final ByteBuf bytes) { + private int appendToNextSegment(final ByteBufMapper mapper, final T entry) { currentWriter.flush(); currentSegment.releaseWriter(); currentSegment = journal.createNextSegment(); currentWriter = currentSegment.acquireWriter(); - return currentWriter.append(bytes).index(); + return verifyNotNull(currentWriter.append(mapper, entry)); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java index f28390c84b..db0531acec 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java @@ -18,11 +18,13 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; /** * A {@link JournalReader} backed by a {@link ByteBufReader}. */ +@NonNullByDefault final class SegmentedJournalReader implements JournalReader { private final ByteBufMapper mapper; private final ByteBufReader reader; @@ -54,9 +56,10 @@ final class SegmentedJournalReader implements JournalReader { @Override public @Nullable T tryNext(final EntryMapper entryMapper) { - return reader.tryNext( - (index, buf) -> requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(buf), buf.readableBytes())) - ); + return reader.tryNext((index, buf) -> { + final var size = buf.readableBytes(); + return requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(index, buf), size)); + }); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java index a5e0737940..144babce76 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java @@ -18,9 +18,12 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; + /** * A {@link JournalWriter} backed by a {@link ByteBufWriter}. */ +@NonNullByDefault final class SegmentedJournalWriter implements JournalWriter { private final ByteBufMapper mapper; private final ByteBufWriter writer; @@ -42,8 +45,8 @@ final class SegmentedJournalWriter implements JournalWriter { @Override public Indexed append(final T entry) { - final var buf = mapper.objectToBytes(entry); - return new Indexed<>(writer.append(buf), entry, buf.readableBytes()); + final var index = writer.nextIndex(); + return new Indexed<>(index, entry, writer.append(mapper, entry)); } @Override -- 2.36.6