From 77d2e788c513715919e9421ff1aaecb880ace16f Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 18 Apr 2024 20:39:13 +0200 Subject: [PATCH] Add MappedByteBuf Add MappedByteBuf and switch File{Access,Reader,Writer} to use ByteBufs as their lingua franca. This reduces friction between the internal APIs and the user-facing ones. JIRA: CONTROLLER-2115 Change-Id: Id05744378e883cbfbf386407945a64e34282e213 Signed-off-by: Robert Varga --- .../storage/journal/DiskFileAccess.java | 11 +- .../storage/journal/DiskFileReader.java | 31 +- .../storage/journal/DiskFileWriter.java | 17 +- .../io/atomix/storage/journal/FileReader.java | 4 +- .../io/atomix/storage/journal/FileWriter.java | 10 +- .../storage/journal/JournalSegmentFile.java | 39 +- .../storage/journal/JournalSegmentReader.java | 9 +- .../storage/journal/JournalSegmentWriter.java | 16 +- .../atomix/storage/journal/MappedByteBuf.java | 413 ++++++++++++++++++ .../storage/journal/MappedFileAccess.java | 35 +- .../storage/journal/MappedFileReader.java | 10 +- .../storage/journal/MappedFileWriter.java | 12 +- .../journal/SegmentedByteBufJournal.java | 32 +- 13 files changed, 541 insertions(+), 98 deletions(-) create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/MappedByteBuf.java diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileAccess.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileAccess.java index 75064b86f7..531e08963e 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileAccess.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileAccess.java @@ -15,7 +15,7 @@ */ package io.atomix.storage.journal; -import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; import org.eclipse.jdt.annotation.NonNullByDefault; /** @@ -34,12 +34,12 @@ final class DiskFileAccess extends FileAccess { @Override DiskFileReader newFileReader() { - return new DiskFileReader(file, allocateBuffer(maxEntrySize, file.maxSize())); + return new DiskFileReader(file, allocateBuffer(file, maxEntrySize)); } @Override DiskFileWriter newFileWriter() { - return new DiskFileWriter(file, maxEntrySize, allocateBuffer(maxEntrySize, file.maxSize())); + return new DiskFileWriter(file, maxEntrySize, allocateBuffer(file, maxEntrySize)); } @Override @@ -47,8 +47,9 @@ final class DiskFileAccess extends FileAccess { // No-op } - private static ByteBuffer allocateBuffer(final int maxEntrySize, final int maxSegmentSize) { - return ByteBuffer.allocate(chooseBufferSize(maxEntrySize, maxSegmentSize)); + private static ByteBuf allocateBuffer(final JournalSegmentFile file, final int maxEntrySize) { + final var bufferSize = chooseBufferSize(maxEntrySize, file.maxSize()); + return file.allocator().heapBuffer(bufferSize, bufferSize); } private static int chooseBufferSize(final int maxEntrySize, final int maxSegmentSize) { diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java index d5795eb980..197a16dc65 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java @@ -16,9 +16,10 @@ package io.atomix.storage.journal; import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; +import io.netty.buffer.ByteBuf; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import org.eclipse.jdt.annotation.NonNull; @@ -27,38 +28,39 @@ import org.eclipse.jdt.annotation.NonNull; */ final class DiskFileReader extends FileReader { private final FileChannel channel; - private final ByteBuffer buffer; + private final ByteBuf buffer; // tracks where memory's first available byte maps to in terms of FileChannel.position() private int bufferPosition; // Note: take ownership of the buffer - DiskFileReader(final JournalSegmentFile file, final ByteBuffer buffer) { + DiskFileReader(final JournalSegmentFile file, final ByteBuf buffer) { super(file); + this.buffer = requireNonNull(buffer); channel = file.channel(); - this.buffer = buffer.flip(); bufferPosition = 0; } @Override void invalidateCache() { - buffer.clear().flip(); + buffer.clear(); bufferPosition = 0; } @Override - ByteBuffer read(final int position, final int size) { + ByteBuf read(final int position, final int size) { // calculate logical seek distance between buffer's first byte and position and split flow between // forward-moving and backwards-moving code paths. final int seek = bufferPosition - position; return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size); } - private @NonNull ByteBuffer forwardAndRead(final int seek, final int position, final int size) { - final int missing = buffer.limit() - seek - size; + private @NonNull ByteBuf forwardAndRead(final int seek, final int position, final int size) { + final int remaining = buffer.writerIndex() - seek; + final int missing = remaining - size; if (missing <= 0) { // fast path: we have the requested region - return buffer.slice(seek, size).asReadOnlyBuffer(); + return buffer.slice(seek, size).asReadOnly(); } // We need to read more data, but let's salvage what we can: @@ -66,12 +68,12 @@ final class DiskFileReader extends FileReader { // - run compact, which moves everything between position and limit onto the beginning of buffer and // sets it up to receive more bytes // - start the read accounting for the seek - buffer.position(seek).compact(); + buffer.writeBytes(buffer, seek, remaining); readAtLeast(position + seek, missing); return setAndSlice(position, size); } - private @NonNull ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) { + private @NonNull ByteBuf rewindAndRead(final int rewindBy, final int position, final int size) { // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and // do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and // read it. @@ -83,16 +85,15 @@ final class DiskFileReader extends FileReader { private void readAtLeast(final int readPosition, final int readAtLeast) { final int bytesRead; try { - bytesRead = channel.read(buffer, readPosition); + bytesRead = buffer.writeBytes(channel, readPosition, readAtLeast); } catch (IOException e) { throw new StorageException(e); } verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast); - buffer.flip(); } - private @NonNull ByteBuffer setAndSlice(final int position, final int size) { + private @NonNull ByteBuf setAndSlice(final int position, final int size) { bufferPosition = position; - return buffer.slice(0, size).asReadOnlyBuffer(); + return buffer.slice(0, size).asReadOnly(); } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java index 687da045a3..7ca43c8fea 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java @@ -18,21 +18,22 @@ package io.atomix.storage.journal; import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; import static java.util.Objects.requireNonNull; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.FileChannel; /** * A {@link StorageLevel#DISK} {@link FileWriter}. */ final class DiskFileWriter extends FileWriter { - private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]); + private static final ByteBuf ZERO_ENTRY_HEADER = Unpooled.wrappedBuffer(new byte[HEADER_BYTES]); private final DiskFileReader reader; private final FileChannel channel; - private final ByteBuffer buffer; + private final ByteBuf buffer; - DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize, final ByteBuffer buffer) { + DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize, final ByteBuf buffer) { super(file, maxEntrySize); this.buffer = requireNonNull(buffer); channel = file.channel(); @@ -47,21 +48,21 @@ final class DiskFileWriter extends FileWriter { @Override void writeEmptyHeader(final int position) { try { - channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position); + ZERO_ENTRY_HEADER.getBytes(0, channel, position, HEADER_BYTES); } catch (IOException e) { throw new StorageException(e); } } @Override - ByteBuffer startWrite(final int position, final int size) { + ByteBuf startWrite(final int position, final int size) { return buffer.clear().slice(0, size); } @Override - void commitWrite(final int position, final ByteBuffer entry) { + void commitWrite(final int position, final ByteBuf entry) { try { - channel.write(entry, position); + entry.readBytes(channel, position, entry.readableBytes()); } catch (IOException e) { throw new StorageException(e); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java index e9f06a15fc..dc44cca34a 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java @@ -18,7 +18,7 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; -import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; import org.eclipse.jdt.annotation.NonNull; /** @@ -44,7 +44,7 @@ abstract sealed class FileReader permits DiskFileReader, MappedFileReader { * @param size to read * @return resulting buffer */ - abstract @NonNull ByteBuffer read(int position, int size); + abstract @NonNull ByteBuf read(int position, int size); @Override public final String toString() { diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java index 262f248ec1..e2cda5274c 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java @@ -18,8 +18,8 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; +import io.netty.buffer.ByteBuf; import java.io.IOException; -import java.nio.ByteBuffer; /** * An abstraction over how to write a {@link JournalSegmentFile}. @@ -57,15 +57,15 @@ abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter { /** * Allocate file space. Note that the allocated space may be a buffer disconnected from the file. Any modifications - * to the returned buffer need to be committed via {@link #commitWrite(int, ByteBuffer)}. + * to the returned buffer need to be committed via {@link #commitWrite(int, ByteBuf)}. * * @param position position to start from * @param size the size to allocate - * @return A {@link ByteBuffer} covering the allocated area + * @return A {@link ByteBuf} covering the allocated area */ - abstract ByteBuffer startWrite(int position, int size); + abstract ByteBuf startWrite(int position, int size); - abstract void commitWrite(int position, ByteBuffer entry); + abstract void commitWrite(int position, ByteBuf entry); /** * Flushes written entries to disk. diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java index 5b5a2da83f..8bb368f085 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java @@ -18,13 +18,14 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; +import io.netty.buffer.ByteBufAllocator; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; -import java.nio.channels.FileChannel.MapMode; import java.nio.file.Path; import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; /** * Segment file utility. @@ -37,19 +38,20 @@ final class JournalSegmentFile { private static final String EXTENSION = "log"; private final @NonNull JournalSegmentDescriptor descriptor; + private final @NonNull ByteBufAllocator allocator; + private final @NonNull RandomAccessFile file; private final @NonNull Path path; - private final RandomAccessFile file; - - private JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor, - final RandomAccessFile file) { + private JournalSegmentFile(final Path path, final ByteBufAllocator allocator, + final JournalSegmentDescriptor descriptor, final RandomAccessFile file) { this.path = requireNonNull(path); + this.allocator = requireNonNull(allocator); this.descriptor = requireNonNull(descriptor); this.file = requireNonNull(file); } static @NonNull JournalSegmentFile createNew(final String name, final File directory, - final JournalSegmentDescriptor descriptor) throws IOException { + final ByteBufAllocator allocator, final JournalSegmentDescriptor descriptor) throws IOException { final var file = createSegmentFile(name, directory, descriptor.id()); final var raf = new RandomAccessFile(file, "rw"); try { @@ -59,10 +61,11 @@ final class JournalSegmentFile { raf.close(); throw e; } - return new JournalSegmentFile(file.toPath(), descriptor, raf); + return new JournalSegmentFile(file.toPath(), allocator, descriptor, raf); } - static @NonNull JournalSegmentFile openExisting(final Path path) throws IOException { + static @NonNull JournalSegmentFile openExisting(final Path path, final ByteBufAllocator allocator) + throws IOException { final var raf = new RandomAccessFile(path.toFile(), "rw"); final JournalSegmentDescriptor descriptor; try { @@ -72,18 +75,27 @@ final class JournalSegmentFile { raf.close(); throw e; } - return new JournalSegmentFile(path, descriptor, raf); + return new JournalSegmentFile(path, allocator, descriptor, raf); } /** - * Returns the segment file. + * Returns the segment file path. * - * @return The segment file. + * @return The segment file path */ @NonNull Path path() { return path; } + /** + * Returns the {@link ByteBufAllocator} for this file. + * + * @return A {@link ByteBufAllocator} + */ + @NonNull ByteBufAllocator allocator() { + return allocator; + } + /** * Returns the segment version. * @@ -131,10 +143,11 @@ final class JournalSegmentFile { * @return A {@link MappedFileAccess} * @throws IOException if an I/O error occurs */ - @NonNull FileAccess newAccess(final StorageLevel level, final int maxEntrySize) throws IOException { + @NonNullByDefault + FileAccess newAccess(final StorageLevel level, final int maxEntrySize) throws IOException { return switch (level) { case DISK -> new DiskFileAccess(this, maxEntrySize); - case MAPPED -> new MappedFileAccess(this, maxEntrySize, channel().map(MapMode.READ_WRITE, 0, maxSize())); + case MAPPED -> MappedFileAccess.of(this, maxEntrySize); }; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java index e925ef8bb1..15accd3d62 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -19,7 +19,6 @@ import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,18 +99,16 @@ final class JournalSegmentReader { // Slice off the entry's bytes final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length); // If the stored checksum does not equal the computed checksum, do not proceed further - final var computed = SegmentEntry.computeChecksum(entryBuffer); + final var computed = SegmentEntry.computeChecksum(entryBuffer.nioBuffer()); if (checksum != computed) { LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed)); invalidateCache(); return null; } - // update position + // update position and return position += SegmentEntry.HEADER_BYTES + length; - - // rewind and return - return Unpooled.wrappedBuffer(entryBuffer.rewind()); + return entryBuffer; } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index 626361308d..9b083bba06 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -21,7 +21,6 @@ import static java.util.Objects.requireNonNull; import io.atomix.storage.journal.JournalSegment.Inactive; import io.atomix.storage.journal.StorageException.TooLarge; import io.atomix.storage.journal.index.JournalIndex; -import io.netty.buffer.Unpooled; import java.io.EOFException; import java.io.IOException; import org.eclipse.jdt.annotation.NonNull; @@ -96,8 +95,8 @@ final class JournalSegmentWriter { // Allocate entry space final var diskEntry = fileWriter.startWrite(position, writeLimit + HEADER_BYTES); - // Create a ByteBuf covering the bytes. Note we do not use slice(), as Netty will do the equivalent. - final var bytes = Unpooled.wrappedBuffer(diskEntry.position(HEADER_BYTES)); + // Create a ByteBuf covering the bytes + final var bytes = diskEntry.slice(HEADER_BYTES, writeLimit); try { mapper.objectToBytes(entry, bytes); } catch (EOFException e) { @@ -115,14 +114,15 @@ final class JournalSegmentWriter { throw new StorageException(e); } - // Determine length, trim distEntry and compute checksum. We are okay with computeChecksum() consuming - // the buffer, as we rewind() it back. + // Determine length, trim disktEntry and compute checksum. final var length = bytes.readableBytes(); - final var checksum = SegmentEntry.computeChecksum( - diskEntry.limit(HEADER_BYTES + length).position(HEADER_BYTES)); + diskEntry.writerIndex(diskEntry.readerIndex() + HEADER_BYTES + length); + + // Compute the checksum + final var checksum = SegmentEntry.computeChecksum(diskEntry.nioBuffer(HEADER_BYTES, length)); // update the header and commit entry to file - fileWriter.commitWrite(position, diskEntry.rewind().putInt(0, length).putInt(Integer.BYTES, checksum)); + fileWriter.commitWrite(position, diskEntry.setInt(0, length).setInt(Integer.BYTES, checksum)); // Update the last entry with the correct index/term/length. currentPosition = bodyPosition + length; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedByteBuf.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedByteBuf.java new file mode 100644 index 0000000000..b7c211357f --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedByteBuf.java @@ -0,0 +1,413 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static java.util.Objects.requireNonNull; + +import io.netty.buffer.AbstractReferenceCountedByteBuf; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.util.internal.PlatformDependent; +import java.io.Flushable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * A {@link ByteBuf} backed by a {@link MappedByteBuffer}. + */ +final class MappedByteBuf extends AbstractReferenceCountedByteBuf implements Flushable { + private final ByteBufAllocator alloc; + + private MappedByteBuffer byteBuffer; + private ByteBuffer internalNio; + + private MappedByteBuf(final ByteBufAllocator alloc, final MappedByteBuffer byteBuffer) { + super(byteBuffer.limit()); + this.alloc = requireNonNull(alloc); + this.byteBuffer = requireNonNull(byteBuffer); + } + + @NonNullByDefault + static MappedByteBuf of(final JournalSegmentFile file) throws IOException { + return new MappedByteBuf(file.allocator(), file.channel().map(MapMode.READ_WRITE, 0, file.maxSize())); + } + + @Override + @SuppressWarnings("checkstyle:avoidHidingCauseException") + public void flush() throws IOException { + ensureAccessible(); + try { + byteBuffer.force(); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + + @Override + protected void deallocate() { + final var local = byteBuffer; + if (local != null) { + byteBuffer = null; + PlatformDependent.freeDirectBuffer(local); + } + } + + @Override + public ByteBufAllocator alloc() { + return alloc; + } + + @Override + public boolean isContiguous() { + return true; + } + + @Override + public boolean hasArray() { + return false; + } + + @Override + public byte[] array() { + throw new UnsupportedOperationException(); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasMemoryAddress() { + return false; + } + + @Override + public long memoryAddress() { + throw new UnsupportedOperationException(); + } + + @Override + public int capacity() { + return maxCapacity(); + } + + @Override + public ByteBuf capacity(final int newCapacity) { + throw new UnsupportedOperationException("capacity cannot be set"); + } + + @Override + @Deprecated + public ByteOrder order() { + return ByteOrder.BIG_ENDIAN; + } + + @Override + public ByteBuf unwrap() { + return null; + } + + @Override + public ByteBuf copy(final int index, final int length) { + ensureAccessible(); + return alloc.heapBuffer(length).writeBytes(byteBuffer.slice(index, length)); + } + + @Override + public boolean isDirect() { + return true; + } + + @Override + public int nioBufferCount() { + return 1; + } + + @Override + public ByteBuffer nioBuffer(final int index, final int length) { + checkIndex(index, length); + return byteBuffer.slice(index, length); + } + + @Override + public ByteBuffer internalNioBuffer(final int index, final int length) { + checkIndex(index, length); + return internalNio().position(index).limit(index + length); + } + + private ByteBuffer internalNio() { + var local = internalNio; + if (local == null) { + internalNio = local = byteBuffer.duplicate(); + } + return local; + } + + @Override + public ByteBuffer[] nioBuffers(final int index, final int length) { + return new ByteBuffer[] { nioBuffer(index, length) }; + } + + @Override + public byte getByte(final int index) { + ensureAccessible(); + return _getByte(index); + } + + @Override + protected byte _getByte(final int index) { + return byteBuffer.get(index); + } + + @Override + public short getShort(final int index) { + ensureAccessible(); + return _getShort(index); + } + + @Override + protected short _getShort(final int index) { + return byteBuffer.getShort(index); + } + + @Override + protected short _getShortLE(final int index) { + return ByteBufUtil.swapShort(byteBuffer.getShort(index)); + } + + @Override + public int getUnsignedMedium(final int index) { + ensureAccessible(); + return _getUnsignedMedium(index); + } + + @Override + protected int _getUnsignedMedium(final int index) { + return (_getByte(index) & 0xff) << 16 | (_getByte(index + 1) & 0xff) << 8 | _getByte(index + 2) & 0xff; + } + + @Override + protected int _getUnsignedMediumLE(final int index) { + return _getByte(index) & 0xff | (_getByte(index + 1) & 0xff) << 8 | (_getByte(index + 2) & 0xff) << 16; + } + + @Override + public int getInt(final int index) { + ensureAccessible(); + return _getInt(index); + } + + @Override + protected int _getInt(final int index) { + return byteBuffer.getInt(index); + } + + @Override + protected int _getIntLE(final int index) { + return ByteBufUtil.swapInt(byteBuffer.getInt(index)); + } + + @Override + public long getLong(final int index) { + ensureAccessible(); + return _getLong(index); + } + + @Override + protected long _getLong(final int index) { + return byteBuffer.getLong(index); + } + + @Override + protected long _getLongLE(final int index) { + return ByteBufUtil.swapLong(byteBuffer.getLong(index)); + } + + @Override + public ByteBuf setByte(final int index, final int value) { + ensureAccessible(); + _setByte(index, value); + return this; + } + + @Override + protected void _setByte(final int index, final int value) { + byteBuffer.put(index, (byte) value); + } + + @Override + public ByteBuf setShort(final int index, final int value) { + ensureAccessible(); + _setShort(index, value); + return this; + } + + @Override + protected void _setShort(final int index, final int value) { + byteBuffer.putShort(index, (short) value); + } + + @Override + protected void _setShortLE(final int index, final int value) { + byteBuffer.putShort(index, ByteBufUtil.swapShort((short) value)); + } + + @Override + public ByteBuf setMedium(final int index, final int value) { + ensureAccessible(); + _setMedium(index, value); + return this; + } + + @Override + protected void _setMedium(final int index, final int value) { + setByte(index, (byte) (value >>> 16)); + setByte(index + 1, (byte) (value >>> 8)); + setByte(index + 2, (byte) value); + } + + @Override + protected void _setMediumLE(final int index, final int value) { + setByte(index, (byte) value); + setByte(index + 1, (byte) (value >>> 8)); + setByte(index + 2, (byte) (value >>> 16)); + } + + @Override + public ByteBuf setInt(final int index, final int value) { + ensureAccessible(); + _setInt(index, value); + return this; + } + + @Override + protected void _setInt(final int index, final int value) { + byteBuffer.putInt(index, value); + } + + @Override + protected void _setIntLE(final int index, final int value) { + byteBuffer.putInt(index, ByteBufUtil.swapInt(value)); + } + + @Override + public ByteBuf setLong(final int index, final long value) { + ensureAccessible(); + _setLong(index, value); + return this; + } + + @Override + protected void _setLong(final int index, final long value) { + byteBuffer.putLong(index, value); + } + + @Override + protected void _setLongLE(final int index, final long value) { + byteBuffer.putLong(index, ByteBufUtil.swapLong(value)); + } + + @Override + public ByteBuf getBytes(final int index, final ByteBuf dst, final int dstIndex, final int length) { + checkDstIndex(index, length, dstIndex, dst.capacity()); + if (dst.hasArray()) { + byteBuffer.get(index, dst.array(), dst.arrayOffset() + dstIndex, length); + } else { + dst.setBytes(dstIndex, this, index, length); + } + return this; + } + + @Override + public ByteBuf getBytes(final int index, final byte[] dst, final int dstIndex, final int length) { + checkDstIndex(index, length, dstIndex, dst.length); + byteBuffer.get(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(final int index, final ByteBuffer dst) { + final var remaining = dst.remaining(); + checkIndex(index, remaining); + dst.put(byteBuffer.slice(index, remaining)); + return this; + } + + @Override + public ByteBuf getBytes(final int index, final OutputStream out, final int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(final int index, final GatheringByteChannel out, final int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(final int index, final FileChannel out, final long position, final int length) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(final int index, final ByteBuf src, final int srcIndex, final int length) { + checkSrcIndex(index, length, srcIndex, src.capacity()); + src.getBytes(srcIndex, this, index, length); + return this; + } + + @Override + public ByteBuf setBytes(final int index, final byte[] src, final int srcIndex, final int length) { + checkSrcIndex(index, length, srcIndex, src.length); + byteBuffer.put(index, src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(final int index, final ByteBuffer src) { + ensureAccessible(); + byteBuffer.put(index, src, src.position(), src.remaining()); + return this; + } + + @Override + public int setBytes(final int index, final InputStream in, final int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(final int index, final ScatteringByteChannel in, final int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(final int index, final FileChannel in, final long position, final int length) + throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileAccess.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileAccess.java index 9966aebd7f..6b9683a876 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileAccess.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileAccess.java @@ -17,42 +17,43 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; -import io.netty.util.internal.PlatformDependent; -import java.io.UncheckedIOException; -import java.nio.MappedByteBuffer; +import java.io.IOException; +import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; /** * {@link FileAccess} for {@link StorageLevel#MAPPED}. */ -@NonNullByDefault final class MappedFileAccess extends FileAccess { - private final MappedByteBuffer mappedBuffer; + private MappedByteBuf mappedBuf; - MappedFileAccess(final JournalSegmentFile file, final int maxEntrySize, final MappedByteBuffer mappedBuffer) { + private MappedFileAccess(final @NonNull JournalSegmentFile file, final int maxEntrySize, + final MappedByteBuf mappedBuf) { super(file, maxEntrySize); - this.mappedBuffer = requireNonNull(mappedBuffer); + this.mappedBuf = requireNonNull(mappedBuf); + } + + @NonNullByDefault + static MappedFileAccess of(final JournalSegmentFile file, final int maxEntrySize) throws IOException { + return new MappedFileAccess(file, maxEntrySize, MappedByteBuf.of(file)); } @Override MappedFileReader newFileReader() { - return new MappedFileReader(file, mappedBuffer.slice()); + return new MappedFileReader(file, mappedBuf.duplicate()); } @Override - @SuppressWarnings("checkstyle:avoidHidingCauseException") MappedFileWriter newFileWriter() { - return new MappedFileWriter(file, maxEntrySize, mappedBuffer.slice(), () -> { - try { - mappedBuffer.force(); - } catch (UncheckedIOException e) { - throw e.getCause(); - } - }); + return new MappedFileWriter(file, maxEntrySize, mappedBuf.duplicate(), mappedBuf); } @Override public void close() { - PlatformDependent.freeDirectBuffer(mappedBuffer); + final var toClose = mappedBuf; + if (toClose != null) { + mappedBuf = null; + toClose.release(); + } } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java index 3eca4173db..2ec2e4ccc8 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileReader.java @@ -15,17 +15,17 @@ */ package io.atomix.storage.journal; -import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; /** * A {@link StorageLevel#MAPPED} implementation of {@link FileReader}. Operates on direct mapping of the entire file. */ final class MappedFileReader extends FileReader { - private final ByteBuffer buffer; + private final ByteBuf buffer; - MappedFileReader(final JournalSegmentFile file, final ByteBuffer buffer) { + MappedFileReader(final JournalSegmentFile file, final ByteBuf buffer) { super(file); - this.buffer = buffer.asReadOnlyBuffer(); + this.buffer = buffer.asReadOnly(); } @Override @@ -34,7 +34,7 @@ final class MappedFileReader extends FileReader { } @Override - ByteBuffer read(final int position, final int size) { + ByteBuf read(final int position, final int size) { return buffer.slice(position, size); } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java index 589f4d776c..3a73a98cc2 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java @@ -17,19 +17,19 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; +import io.netty.buffer.ByteBuf; import java.io.Flushable; import java.io.IOException; -import java.nio.ByteBuffer; /** * A {@link StorageLevel#MAPPED} {@link FileWriter}. */ final class MappedFileWriter extends FileWriter { private final MappedFileReader reader; - private final ByteBuffer buffer; + private final ByteBuf buffer; private final Flushable flush; - MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize, final ByteBuffer buffer, + MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize, final ByteBuf buffer, final Flushable flush) { super(file, maxEntrySize); this.buffer = requireNonNull(buffer); @@ -45,16 +45,16 @@ final class MappedFileWriter extends FileWriter { @Override void writeEmptyHeader(final int position) { // Note: we issue a single putLong() instead of two putInt()s. - buffer.putLong(position, 0L); + buffer.setLong(position, 0L); } @Override - ByteBuffer startWrite(final int position, final int size) { + ByteBuf startWrite(final int position, final int size) { return buffer.slice(position, size); } @Override - void commitWrite(final int position, final ByteBuffer entry) { + void commitWrite(final int position, final ByteBuf entry) { // No-op, buffer is write-through } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java index e0be413734..2c6ccb5599 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; +import io.netty.buffer.ByteBufAllocator; import java.io.File; import java.io.IOException; import java.util.Collection; @@ -43,16 +44,17 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { private final ConcurrentNavigableMap segments = new ConcurrentSkipListMap<>(); private final Collection readers = ConcurrentHashMap.newKeySet(); - private final String name; - private final StorageLevel storageLevel; - private final File directory; + private final @NonNull ByteBufAllocator allocator; + private final @NonNull StorageLevel storageLevel; + private final @NonNull File directory; + private final @NonNull String name; + private final @NonNull ByteBufWriter writer; private final int maxSegmentSize; private final int maxEntrySize; @Deprecated(forRemoval = true) private final int maxEntriesPerSegment; private final double indexDensity; private final boolean flushOnCommit; - private final @NonNull ByteBufWriter writer; // null when closed private JournalSegment currentSegment; @@ -60,10 +62,11 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory, final int maxSegmentSize, final int maxEntrySize, final int maxEntriesPerSegment, final double indexDensity, - final boolean flushOnCommit) { + final boolean flushOnCommit, final ByteBufAllocator allocator) { this.name = requireNonNull(name, "name cannot be null"); this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); this.directory = requireNonNull(directory, "directory cannot be null"); + this.allocator = requireNonNull(allocator, "allocator cannot be null"); this.maxSegmentSize = maxSegmentSize; this.maxEntrySize = maxEntrySize; this.maxEntriesPerSegment = maxEntriesPerSegment; @@ -256,7 +259,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { private @NonNull JournalSegment createSegment(final long segmentId, final long firstIndex) { final JournalSegmentFile file; try { - file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder() + file = JournalSegmentFile.createNew(name, directory, allocator, JournalSegmentDescriptor.builder() .withId(segmentId) .withIndex(firstIndex) .withMaxSegmentSize(maxSegmentSize) @@ -307,7 +310,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { if (JournalSegmentFile.isSegmentFile(name, file)) { final JournalSegmentFile segmentFile; try { - segmentFile = JournalSegmentFile.openExisting(file.toPath()); + segmentFile = JournalSegmentFile.openExisting(file.toPath(), allocator); } catch (IOException e) { throw new StorageException(e); } @@ -467,6 +470,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT; private double indexDensity = DEFAULT_INDEX_DENSITY; private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT; + private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT; private Builder() { // on purpose @@ -625,6 +629,18 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { return this; } + /** + * Sets the {@link ByteBufAllocator} to use for allocating various buffers. + * + * @param byteBufAllocator the allocator to use + * @return The builder instance + */ + @SuppressWarnings("checkstyle:hiddenField") + public Builder withByteBufAllocator(final ByteBufAllocator byteBufAllocator) { + this.byteBufAllocator = requireNonNull(byteBufAllocator); + return this; + } + /** * Build the {@link SegmentedByteBufJournal}. * @@ -632,7 +648,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { */ public SegmentedByteBufJournal build() { return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize, - maxEntriesPerSegment, indexDensity, flushOnCommit); + maxEntriesPerSegment, indexDensity, flushOnCommit, byteBufAllocator); } } } -- 2.36.6