From 7f3eb922fce09a366143387ee71bf6f10b698473 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sat, 23 Mar 2024 22:00:34 +0100 Subject: [PATCH] Refactor JournalSegmentReader We have almost-duplicated code in {Disk,Mapped}JournalSegmentReader, which looks very similar to what DiskJournalSegmentWriter revolves around. JournalSegmentReader revolves around tracking current position and interpreting bytes read. Its specializations provide the means for access. JIRA: CONTROLLER-2109 Change-Id: I80b78cde5baf6b222d888e635dd5c854331f261a Signed-off-by: Robert Varga --- .../journal/DiskJournalSegmentReader.java | 120 +++++++++--------- .../storage/journal/JournalSegmentReader.java | 108 ++++++++++++++-- .../journal/MappedJournalSegmentReader.java | 66 ++-------- 3 files changed, 172 insertions(+), 122 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java index bf02a90fe0..766a7e7d65 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentReader.java @@ -16,11 +16,12 @@ */ package io.atomix.storage.journal; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + import java.io.IOException; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.zip.CRC32; /** * Log segment reader. @@ -28,74 +29,71 @@ import java.util.zip.CRC32; * @author Jordan Halterman */ final class DiskJournalSegmentReader extends JournalSegmentReader { - private final FileChannel channel; - private final ByteBuffer memory; - private long currentPosition; - - DiskJournalSegmentReader( - FileChannel channel, - JournalSegment segment, - int maxEntrySize, - JournalSerdes namespace) { - super(segment, maxEntrySize, namespace); - this.channel = channel; - this.memory = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2); - } + private final FileChannel channel; + private final ByteBuffer buffer; - @Override - void setPosition(int position) { - currentPosition = position; - memory.clear().flip(); - } + // tracks where memory's first available byte maps to in terms of FileChannel.position() + private int bufferPosition; - @Override - Indexed readEntry(final long index) { - try { - // Read more bytes from the segment if necessary. - if (memory.remaining() < maxEntrySize) { - long position = currentPosition + memory.position(); - channel.read(memory.clear(), position); - currentPosition = position; - memory.flip(); - } + DiskJournalSegmentReader(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, + final JournalSerdes namespace) { + super(segment, maxEntrySize, namespace); + this.channel = requireNonNull(channel); + buffer = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2).flip(); + bufferPosition = 0; + } - // Mark the buffer so it can be reset if necessary. - memory.mark(); + @Override void invalidateCache() { + buffer.clear().flip(); + bufferPosition = 0; + } - try { - // Read the length of the entry. - final int length = memory.getInt(); + @Override ByteBuffer read(final int position, final int size) { + // calculate logical seek distance between buffer's first byte and position and split flow between + // forward-moving and backwards-moving code paths. + final int seek = bufferPosition - position; + return seek >= 0 ? forwardAndRead(seek, position, size) : rewindAndRead(-seek, position, size); + } - // If the buffer length is zero then return. - if (length <= 0 || length > maxEntrySize) { - memory.reset().limit(memory.position()); - return null; + private ByteBuffer forwardAndRead(final int seek, final int position, final int size) { + final int missing = buffer.limit() - seek - size; + if (missing <= 0) { + // fast path: we have the requested region + return buffer.slice(seek, size).asReadOnlyBuffer(); } - // Read the checksum of the entry. - long checksum = memory.getInt() & 0xFFFFFFFFL; + // We need to read more data, but let's salvage what we can: + // - set buffer position to seek, which means it points to the same as position + // - run compact, which moves everything between position and limit onto the beginning of buffer and + // sets it up to receive more bytes + // - start the read accounting for the seek + buffer.position(seek).compact(); + readAtLeast(position + seek, missing); + return setAndSlice(position, size); + } - // Compute the checksum for the entry bytes. - final CRC32 crc32 = new CRC32(); - crc32.update(memory.array(), memory.position(), length); + ByteBuffer rewindAndRead(final int rewindBy, final int position, final int size) { + // TODO: Lazy solution. To be super crisp, we want to find out how much of the buffer we can salvage and + // do all the limit/position fiddling before and after read. Right now let's just flow the buffer up and + // read it. + buffer.clear(); + readAtLeast(position, size); + return setAndSlice(position, size); + } - // If the stored checksum equals the computed checksum, return the entry. - if (checksum == crc32.getValue()) { - int limit = memory.limit(); - memory.limit(memory.position() + length); - E entry = namespace.deserialize(memory); - memory.limit(limit); - return new Indexed<>(index, entry, length); - } else { - memory.reset().limit(memory.position()); - return null; + void readAtLeast(final int readPosition, final int readAtLeast) { + final int bytesRead; + try { + bytesRead = channel.read(buffer, readPosition); + } catch (IOException e) { + throw new StorageException(e); } - } catch (BufferUnderflowException e) { - memory.reset().limit(memory.position()); - return null; - } - } catch (IOException e) { - throw new StorageException(e); + verify(bytesRead >= readAtLeast, "Short read %s, expected %s", bytesRead, readAtLeast); + buffer.flip(); + } + + private ByteBuffer setAndSlice(final int position, final int size) { + bufferPosition = position; + return buffer.slice(0, size).asReadOnlyBuffer(); } - } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java index 44c990f5a3..af07a1e953 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -15,26 +15,40 @@ */ package io.atomix.storage.journal; +import static com.google.common.base.Verify.verify; import static java.util.Objects.requireNonNull; +import com.esotericsoftware.kryo.KryoException; +import java.nio.ByteBuffer; +import java.util.zip.CRC32; import org.eclipse.jdt.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; abstract sealed class JournalSegmentReader permits DiskJournalSegmentReader, MappedJournalSegmentReader { - final int maxEntrySize; - final JournalSerdes namespace; + private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class); + private final JournalSegment segment; + private final JournalSerdes namespace; + private final int maxSegmentSize; + private final int maxEntrySize; + + private int position; JournalSegmentReader(final JournalSegment segment, final int maxEntrySize, final JournalSerdes namespace) { this.segment = requireNonNull(segment); + maxSegmentSize = segment.descriptor().maxSegmentSize(); this.maxEntrySize = maxEntrySize; this.namespace = requireNonNull(namespace); } /** - * Close this reader. + * Return the current position. + * + * @return current position. */ - final void close() { - segment.closeReader(this); + final int position() { + return position; } /** @@ -42,13 +56,91 @@ abstract sealed class JournalSegmentReader permits DiskJournalSegmentReader, * * @param position new position */ - abstract void setPosition(int position); + final void setPosition(final int position) { + verify(position >= JournalSegmentDescriptor.BYTES && position < maxSegmentSize, + "Invalid position %s", position); + this.position = position; + invalidateCache(); + } /** - * Reads the entry at specified index. + * Invalidate any cache that is present, so that the next read is coherent with the backing file. + */ + abstract void invalidateCache(); + + /** + * Reads the next entry, assigning it specified index. * * @param index entry index * @return The entry, or {@code null} */ - abstract @Nullable Indexed readEntry(long index); + final @Nullable Indexed readEntry(final long index) { + // Check if there is enough in the buffer remaining + final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES; + if (remaining < 0) { + // Not enough space in the segment, there can never be another entry + return null; + } + + // Calculate maximum entry length not exceeding file size nor maxEntrySize + final var maxLength = Math.min(remaining, maxEntrySize); + final var buffer = read(position, maxLength + SegmentEntry.HEADER_BYTES); + + // Read the entry length + final var length = buffer.getInt(0); + if (length < 1 || length > maxLength) { + // Invalid length, make sure next read re-tries + invalidateCache(); + return null; + } + + // Read the entry checksum + final int checksum = buffer.getInt(Integer.BYTES); + + // Slice off the entry's bytes + final var entryBytes = buffer.slice(SegmentEntry.HEADER_BYTES, length); + // Compute the checksum for the entry bytes. + final var crc32 = new CRC32(); + crc32.update(entryBytes); + + // If the stored checksum does not equal the computed checksum, do not proceed further + final var computed = (int) crc32.getValue(); + if (checksum != computed) { + LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed)); + invalidateCache(); + return null; + } + + // Attempt to deserialize + final E entry; + try { + entry = namespace.deserialize(entryBytes.rewind()); + } catch (KryoException e) { + // TODO: promote this to a hard error, as it should never happen + LOG.debug("Failed to deserialize entry", e); + invalidateCache(); + return null; + } + + // We are all set. Update the position. + position = position + SegmentEntry.HEADER_BYTES + length; + return new Indexed<>(index, entry, length); + } + + /** + * Read the some bytes as specified position. The sum of position and size is guaranteed not to exceed + * {@link #maxSegmentSize}. + * + * @param position position to the entry header + * @param size to read, guaranteed to not exceed {@link #maxEntrySize} + * @return resulting buffer + */ + abstract ByteBuffer read(int position, int size); + + /** + * Close this reader. + */ + final void close() { + segment.closeReader(this); + } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java index bbf100740a..e905972358 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java @@ -16,9 +16,7 @@ */ package io.atomix.storage.journal; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; -import java.util.zip.CRC32; /** * Log segment reader. @@ -26,59 +24,21 @@ import java.util.zip.CRC32; * @author Jordan Halterman */ final class MappedJournalSegmentReader extends JournalSegmentReader { - private final ByteBuffer buffer; + private final ByteBuffer buffer; - MappedJournalSegmentReader( - ByteBuffer buffer, - JournalSegment segment, - int maxEntrySize, - JournalSerdes namespace) { - super(segment, maxEntrySize, namespace); - this.buffer = buffer.slice(); - } - - @Override - void setPosition(int position) { - buffer.position(position); - } - - @Override - Indexed readEntry(final long index) { - // Mark the buffer so it can be reset if necessary. - buffer.mark(); - - try { - // Read the length of the entry. - final int length = buffer.getInt(); - - // If the buffer length is zero then return. - if (length <= 0 || length > maxEntrySize) { - buffer.reset(); - return null; - } - - // Read the checksum of the entry. - long checksum = buffer.getInt() & 0xFFFFFFFFL; + MappedJournalSegmentReader(final ByteBuffer buffer, final JournalSegment segment, final int maxEntrySize, + final JournalSerdes namespace) { + super(segment, maxEntrySize, namespace); + this.buffer = buffer.slice().asReadOnlyBuffer(); + } - // Compute the checksum for the entry bytes. - final CRC32 crc32 = new CRC32(); - ByteBuffer slice = buffer.slice(); - slice.limit(length); - crc32.update(slice); + @Override + void invalidateCache() { + // No-op: the mapping is guaranteed to be coherent + } - // If the stored checksum equals the computed checksum, return the entry. - if (checksum == crc32.getValue()) { - slice.rewind(); - E entry = namespace.deserialize(slice); - buffer.position(buffer.position() + length); - return new Indexed<>(index, entry, length); - } else { - buffer.reset(); - return null; - } - } catch (BufferUnderflowException e) { - buffer.reset(); - return null; + @Override + ByteBuffer read(final int position, final int size) { + return buffer.slice(position, size); } - } } -- 2.36.6