From fd9dfdc05c75e078b2d4c65d48dd48fdf10b8fe4 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 27 Mar 2024 00:04:35 +0100 Subject: [PATCH 1/1] Centralize JournalSegmentWriter.append() We have almost-identical implementations of append(), differing only slightly in how the do buffer management. Expose a ByteBuffer-based API to writing to segment file, in terms of abstract methods and pull the implementation down into the superclass. JIRA: CONTROLLER-2100 Change-Id: I8d5629019d1c5d2978b0861e30e80346fbfc3c31 Signed-off-by: Robert Varga --- .../journal/DiskJournalSegmentWriter.java | 192 +++++++---------- .../storage/journal/JournalSegmentWriter.java | 73 ++++++- .../journal/MappedJournalSegmentWriter.java | 193 +++++++----------- 3 files changed, 218 insertions(+), 240 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java index d3aa3332c7..266320127b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java @@ -18,15 +18,11 @@ package io.atomix.storage.journal; import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; -import com.esotericsoftware.kryo.KryoException; -import io.atomix.storage.journal.StorageException.TooLarge; import io.atomix.storage.journal.index.JournalIndex; import java.io.IOException; -import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import java.util.zip.CRC32; /** * Segment writer. @@ -44,117 +40,85 @@ import java.util.zip.CRC32; * @author Jordan Halterman */ final class DiskJournalSegmentWriter extends JournalSegmentWriter { - private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]); - - private final JournalSegmentReader reader; - private final ByteBuffer buffer; - - DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, - final JournalIndex index, final JournalSerdes namespace) { - super(channel, segment, maxEntrySize, index, namespace); - - buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize); - reader = new JournalSegmentReader<>(segment, - new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace); - reset(0); - } - - DiskJournalSegmentWriter(final JournalSegmentWriter previous) { - super(previous); - - buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize); - reader = new JournalSegmentReader<>(segment, - new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace); - } - - @Override - MappedByteBuffer buffer() { - return null; - } - - @Override - MappedJournalSegmentWriter toMapped() { - return new MappedJournalSegmentWriter<>(this); - } - - @Override - DiskJournalSegmentWriter toFileChannel() { - return this; - } - - @Override - JournalSegmentReader reader() { - return reader; - } - - @Override - @SuppressWarnings("unchecked") - Indexed append(final T entry) { - // Store the entry index. - final long index = getNextIndex(); - - // Serialize the entry. - try { - namespace.serialize(entry, buffer.clear().position(HEADER_BYTES)); - } catch (KryoException e) { - throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } - buffer.flip(); - - final int length = buffer.limit() - HEADER_BYTES; - // Ensure there's enough space left in the buffer to store the entry. - if (maxSegmentSize - currentPosition < length + HEADER_BYTES) { - throw new BufferOverflowException(); - } - - // If the entry length exceeds the maximum entry size then throw an exception. - if (length > maxEntrySize) { - throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")"); - } - - // Compute the checksum for the entry. - final var crc32 = new CRC32(); - crc32.update(buffer.slice(HEADER_BYTES, length)); - - // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue()); - try { - channel.write(buffer, currentPosition); - } catch (IOException e) { - throw new StorageException(e); - } - - // Update the last entry with the correct index/term/length. - final var indexedEntry = new Indexed(index, entry, length); - lastEntry = indexedEntry; - this.index.index(index, currentPosition); - - currentPosition = currentPosition + HEADER_BYTES + length; - return (Indexed) indexedEntry; - } - - @Override - void writeEmptyHeader(final int position) { - try { - channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position); - } catch (IOException e) { - throw new StorageException(e); + private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]); + + private final JournalSegmentReader reader; + private final ByteBuffer buffer; + + DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, + final JournalIndex index, final JournalSerdes namespace) { + super(channel, segment, maxEntrySize, index, namespace); + + buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize); + reader = new JournalSegmentReader<>(segment, + new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace); + reset(0); + } + + DiskJournalSegmentWriter(final JournalSegmentWriter previous) { + super(previous); + + buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize); + reader = new JournalSegmentReader<>(segment, + new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace); + } + + @Override + MappedByteBuffer buffer() { + return null; + } + + @Override + MappedJournalSegmentWriter toMapped() { + return new MappedJournalSegmentWriter<>(this); + } + + @Override + DiskJournalSegmentWriter toFileChannel() { + return this; + } + + @Override + JournalSegmentReader reader() { + return reader; } - } - - @Override - void flush() { - try { - if (channel.isOpen()) { - channel.force(true); - } - } catch (IOException e) { - throw new StorageException(e); + + @Override + ByteBuffer startWrite(final int position, final int size) { + return buffer.clear().slice(0, size); + } + + @Override + void commitWrite(final int position, final ByteBuffer entry) { + try { + channel.write(entry, position); + } catch (IOException e) { + throw new StorageException(e); + } } - } - @Override - void close() { - flush(); - } + @Override + void writeEmptyHeader(final int position) { + try { + channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position); + } catch (IOException e) { + throw new StorageException(e); + } + } + + @Override + void flush() { + try { + if (channel.isOpen()) { + channel.force(true); + } + } catch (IOException e) { + throw new StorageException(e); + } + } + + @Override + void close() { + flush(); + } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index af633a248f..df8ca35068 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -18,23 +18,26 @@ package io.atomix.storage.journal; import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; import static java.util.Objects.requireNonNull; +import com.esotericsoftware.kryo.KryoException; import io.atomix.storage.journal.index.JournalIndex; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.util.zip.CRC32; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, MappedJournalSegmentWriter { final @NonNull FileChannel channel; final @NonNull JournalSegment segment; - final @NonNull JournalIndex index; + private final @NonNull JournalIndex index; final @NonNull JournalSerdes namespace; final int maxSegmentSize; final int maxEntrySize; - // FIXME: hide these two fields - Indexed lastEntry; - int currentPosition; + private Indexed lastEntry; + private int currentPosition; JournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, final JournalIndex index, final JournalSerdes namespace) { @@ -90,7 +93,57 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, * @param entry The entry to append. * @return The appended indexed entry. */ - abstract Indexed append(T entry); + final Indexed append(final T entry) { + // Store the entry index. + final long index = getNextIndex(); + final int position = currentPosition; + + // Serialize the entry. + final int bodyPosition = position + HEADER_BYTES; + final int avail = maxSegmentSize - bodyPosition; + if (avail < 0) { + throw new BufferOverflowException(); + } + + final var writeLimit = Math.min(avail, maxEntrySize); + final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES); + try { + namespace.serialize(entry, diskEntry); + } catch (KryoException e) { + if (writeLimit != maxEntrySize) { + // We have not provided enough capacity, signal to roll to next segment + throw new BufferOverflowException(); + } + + // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum. + throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); + } + + final int length = diskEntry.position() - HEADER_BYTES; + + // Compute the checksum for the entry. + final var crc32 = new CRC32(); + crc32.update(diskEntry.flip().position(HEADER_BYTES)); + + // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. + diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue()); + commitWrite(position, diskEntry.rewind()); + + // Update the last entry with the correct index/term/length. + final var indexedEntry = new Indexed(index, entry, length); + lastEntry = indexedEntry; + this.index.index(index, position); + + currentPosition = bodyPosition + length; + + @SuppressWarnings("unchecked") + final var ugly = (Indexed) indexedEntry; + return ugly; + } + + abstract ByteBuffer startWrite(int position, int size); + + abstract void commitWrite(int position, ByteBuffer entry); /** * Resets the head of the segment to the given index. @@ -141,7 +194,7 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, final void truncate(final long index) { // If the index is greater than or equal to the last index, skip the truncate. if (index >= getLastIndex()) { - return; + return; } // Reset the last entry. @@ -151,11 +204,11 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, this.index.truncate(index); if (index < segment.firstIndex()) { - // Reset the writer to the first entry. - currentPosition = JournalSegmentDescriptor.BYTES; + // Reset the writer to the first entry. + currentPosition = JournalSegmentDescriptor.BYTES; } else { - // Reset the writer to the given index. - reset(index); + // Reset the writer to the given index. + reset(index); } // Zero the entry header at current channel position. diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java index ffb08d29e1..00dd4c6cec 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java @@ -16,16 +16,11 @@ */ package io.atomix.storage.journal; -import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; - -import com.esotericsoftware.kryo.KryoException; import io.atomix.storage.journal.index.JournalIndex; import java.io.IOException; -import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import java.util.zip.CRC32; import org.eclipse.jdt.annotation.NonNull; /** @@ -44,121 +39,87 @@ import org.eclipse.jdt.annotation.NonNull; * @author Jordan Halterman */ final class MappedJournalSegmentWriter extends JournalSegmentWriter { - private final @NonNull MappedByteBuffer mappedBuffer; - private final JournalSegmentReader reader; - private final ByteBuffer buffer; - - MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, - final JournalIndex index, final JournalSerdes namespace) { - super(channel, segment, maxEntrySize, index, namespace); - - mappedBuffer = mapBuffer(channel, maxSegmentSize); - buffer = mappedBuffer.slice(); - reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer), - maxEntrySize, namespace); - reset(0); - } - - MappedJournalSegmentWriter(final JournalSegmentWriter previous) { - super(previous); - - mappedBuffer = mapBuffer(channel, maxSegmentSize); - buffer = mappedBuffer.slice(); - reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer), - maxEntrySize, namespace); - } - - private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) { - try { - return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize); - } catch (IOException e) { - throw new StorageException(e); + private final @NonNull MappedByteBuffer mappedBuffer; + private final JournalSegmentReader reader; + private final ByteBuffer buffer; + + MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, + final JournalIndex index, final JournalSerdes namespace) { + super(channel, segment, maxEntrySize, index, namespace); + + mappedBuffer = mapBuffer(channel, maxSegmentSize); + buffer = mappedBuffer.slice(); + reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer), + maxEntrySize, namespace); + reset(0); + } + + MappedJournalSegmentWriter(final JournalSegmentWriter previous) { + super(previous); + + mappedBuffer = mapBuffer(channel, maxSegmentSize); + buffer = mappedBuffer.slice(); + reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer), + maxEntrySize, namespace); + } + + private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) { + try { + return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize); + } catch (IOException e) { + throw new StorageException(e); + } + } + + @Override + @NonNull MappedByteBuffer buffer() { + return mappedBuffer; + } + + @Override + MappedJournalSegmentWriter toMapped() { + return this; } - } - - @Override - @NonNull MappedByteBuffer buffer() { - return mappedBuffer; - } - - @Override - MappedJournalSegmentWriter toMapped() { - return this; - } - - @Override - DiskJournalSegmentWriter toFileChannel() { - close(); - return new DiskJournalSegmentWriter<>(this); - } - - @Override - JournalSegmentReader reader() { - return reader; - } - - @Override - @SuppressWarnings("unchecked") - Indexed append(final T entry) { - // Store the entry index. - final long index = getNextIndex(); - - // Serialize the entry. - final int bodyPosition = currentPosition + HEADER_BYTES; - final int avail = maxSegmentSize - bodyPosition; - if (avail < 0) { - throw new BufferOverflowException(); + + @Override + DiskJournalSegmentWriter toFileChannel() { + close(); + return new DiskJournalSegmentWriter<>(this); + } + + @Override + JournalSegmentReader reader() { + return reader; + } + + @Override + ByteBuffer startWrite(final int position, final int size) { + return buffer.slice(position, size); + } + + @Override + void commitWrite(final int position, final ByteBuffer entry) { + // No-op, buffer is write-through + } + + @Override + void writeEmptyHeader(final int position) { + // Note: we issue a single putLong() instead of two putInt()s. + buffer.putLong(position, 0L); } - final var entryBytes = buffer.slice(bodyPosition, Math.min(avail, maxEntrySize)); - try { - namespace.serialize(entry, entryBytes); - } catch (KryoException e) { - if (entryBytes.capacity() != maxEntrySize) { - // We have not provided enough capacity, signal to roll to next segment - throw new BufferOverflowException(); - } - - // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum. - throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); + @Override + void flush() { + mappedBuffer.force(); } - final int length = entryBytes.position(); - - // Compute the checksum for the entry. - final var crc32 = new CRC32(); - crc32.update(entryBytes.flip()); - - // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - buffer.putInt(currentPosition, length).putInt(currentPosition + Integer.BYTES, (int) crc32.getValue()); - - // Update the last entry with the correct index/term/length. - Indexed indexedEntry = new Indexed<>(index, entry, length); - lastEntry = indexedEntry; - this.index.index(index, currentPosition); - - currentPosition = currentPosition + HEADER_BYTES + length; - return (Indexed) indexedEntry; - } - - @Override - void writeEmptyHeader(final int position) { - // Note: we issue a single putLong() instead of two putInt()s. - buffer.putLong(position, 0L); - } - - @Override - void flush() { - mappedBuffer.force(); - } - - @Override - void close() { - flush(); - try { - BufferCleaner.freeBuffer(mappedBuffer); - } catch (IOException e) { - throw new StorageException(e); + @Override + void close() { + flush(); + try { + BufferCleaner.freeBuffer(mappedBuffer); + } catch (IOException e) { + throw new StorageException(e); + } } - } } -- 2.36.6