From 32bb8e8275884dd0e6dee40b02785c2e606a0914 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 26 Mar 2024 21:43:19 +0100 Subject: [PATCH] Unify JournalSegmentWriter.reset(long) We have two distinct implementations here. The version for StorageLevel.DISK is generic enough to work for StorageLevel.MAPPED, and will guarantee consistency with reader implementation (which is already shared across StorageLevels). JIRA: CONTROLLER-2100 Change-Id: I3e9a5c8a72be766431e5cb527cae1f4809964dd7 Signed-off-by: Robert Varga --- .../journal/DiskJournalSegmentWriter.java | 47 ++---------- .../storage/journal/JournalSegmentWriter.java | 53 ++++++++++++-- .../journal/MappedJournalSegmentWriter.java | 71 ++++--------------- 3 files changed, 64 insertions(+), 107 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java index b634169fe8..7120158839 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java @@ -49,9 +49,6 @@ final class DiskJournalSegmentWriter extends JournalSegmentWriter { private final JournalSegmentReader reader; private final ByteBuffer buffer; - private Indexed lastEntry; - private int currentPosition; - DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, final JournalIndex index, final JournalSerdes namespace) { super(channel, segment, maxEntrySize, index, namespace); @@ -62,14 +59,12 @@ final class DiskJournalSegmentWriter extends JournalSegmentWriter { reset(0); } - DiskJournalSegmentWriter(final JournalSegmentWriter previous, final int position) { + DiskJournalSegmentWriter(final JournalSegmentWriter previous) { super(previous); buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize); reader = new JournalSegmentReader<>(segment, new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace); - lastEntry = previous.getLastEntry(); - currentPosition = position; } @Override @@ -79,7 +74,7 @@ final class DiskJournalSegmentWriter extends JournalSegmentWriter { @Override MappedJournalSegmentWriter toMapped() { - return new MappedJournalSegmentWriter<>(this, currentPosition); + return new MappedJournalSegmentWriter<>(this); } @Override @@ -88,42 +83,8 @@ final class DiskJournalSegmentWriter extends JournalSegmentWriter { } @Override - void reset(final long index) { - // acquire ownership of cache and make sure reader does not see anything we've done once we're done - reader.invalidateCache(); - try { - resetWithBuffer(index); - } finally { - // Make sure reader does not see anything we've done - reader.invalidateCache(); - } - } - - private void resetWithBuffer(final long index) { - long nextIndex = firstIndex; - - // Clear the buffer indexes and acquire ownership of the buffer - currentPosition = JournalSegmentDescriptor.BYTES; - reader.setPosition(JournalSegmentDescriptor.BYTES); - - while (index == 0 || nextIndex <= index) { - final var entry = reader.readEntry(nextIndex); - if (entry == null) { - break; - } - - lastEntry = entry; - this.index.index(nextIndex, currentPosition); - nextIndex++; - - // Update the current position for indexing. - currentPosition = currentPosition + HEADER_BYTES + entry.size(); - } - } - - @Override - Indexed getLastEntry() { - return lastEntry; + JournalSegmentReader reader() { + return reader; } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index 23b32b2e09..f6a7e94987 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -15,6 +15,7 @@ */ package io.atomix.storage.journal; +import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; import static java.util.Objects.requireNonNull; import io.atomix.storage.journal.index.JournalIndex; @@ -32,6 +33,10 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, final int maxEntrySize; final long firstIndex; + // FIXME: hide these two fields + Indexed lastEntry; + int currentPosition; + JournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, final JournalIndex index, final JournalSerdes namespace) { this.channel = requireNonNull(channel); @@ -51,6 +56,8 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, maxSegmentSize = previous.maxSegmentSize; maxEntrySize = previous.maxEntrySize; firstIndex = previous.firstIndex; + lastEntry = previous.lastEntry; + currentPosition = previous.currentPosition; } /** @@ -59,8 +66,7 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, * @return The last written index. */ final long getLastIndex() { - final Indexed lastEntry; - return (lastEntry = getLastEntry()) != null ? lastEntry.index() : firstIndex - 1; + return lastEntry != null ? lastEntry.index() : firstIndex - 1; } /** @@ -68,7 +74,9 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, * * @return The last entry written. */ - abstract Indexed getLastEntry(); + final Indexed getLastEntry() { + return lastEntry; + } /** * Returns the next index to be written. @@ -76,8 +84,7 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, * @return The next index to be written. */ final long getNextIndex() { - final Indexed lastEntry; - return (lastEntry = getLastEntry()) != null ? lastEntry.index() + 1 : firstIndex; + return lastEntry != null ? lastEntry.index() + 1 : firstIndex; } /** @@ -93,7 +100,41 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, * * @param index the index to which to reset the head of the segment */ - abstract void reset(long index); + final void reset(final long index) { + // acquire ownership of cache and make sure reader does not see anything we've done once we're done + final var reader = reader(); + reader.invalidateCache(); + try { + resetWithBuffer(reader, index); + } finally { + // Make sure reader does not see anything we've done + reader.invalidateCache(); + } + } + + abstract JournalSegmentReader reader(); + + private void resetWithBuffer(final JournalSegmentReader reader, final long index) { + long nextIndex = firstIndex; + + // Clear the buffer indexes and acquire ownership of the buffer + currentPosition = JournalSegmentDescriptor.BYTES; + reader.setPosition(JournalSegmentDescriptor.BYTES); + + while (index == 0 || nextIndex <= index) { + final var entry = reader.readEntry(nextIndex); + if (entry == null) { + break; + } + + lastEntry = entry; + this.index.index(nextIndex, currentPosition); + nextIndex++; + + // Update the current position for indexing. + currentPosition = currentPosition + HEADER_BYTES + entry.size(); + } + } /** * Truncates the log to the given index. diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java index a9fb5b4088..9f437b6125 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java @@ -45,29 +45,27 @@ import org.eclipse.jdt.annotation.NonNull; */ final class MappedJournalSegmentWriter extends JournalSegmentWriter { private final @NonNull MappedByteBuffer mappedBuffer; + private final JournalSegmentReader reader; private final ByteBuffer buffer; - private Indexed lastEntry; - private int currentPosition; - - MappedJournalSegmentWriter( - final FileChannel channel, - final JournalSegment segment, - final int maxEntrySize, - final JournalIndex index, - final JournalSerdes namespace) { + MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize, + final JournalIndex index, final JournalSerdes namespace) { super(channel, segment, maxEntrySize, index, namespace); + mappedBuffer = mapBuffer(channel, maxSegmentSize); buffer = mappedBuffer.slice(); + reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer), + maxEntrySize, namespace); reset(0); } - MappedJournalSegmentWriter(final JournalSegmentWriter previous, final int position) { + MappedJournalSegmentWriter(final JournalSegmentWriter previous) { super(previous); + mappedBuffer = mapBuffer(channel, maxSegmentSize); buffer = mappedBuffer.slice(); - currentPosition = position; - lastEntry = previous.getLastEntry(); + reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer), + maxEntrySize, namespace); } private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) { @@ -91,55 +89,12 @@ final class MappedJournalSegmentWriter extends JournalSegmentWriter { @Override DiskJournalSegmentWriter toFileChannel() { close(); - return new DiskJournalSegmentWriter<>(this, currentPosition); - } - - @Override - void reset(final long index) { - long nextIndex = firstIndex; - - // Clear the buffer indexes. - currentPosition = JournalSegmentDescriptor.BYTES; - - int length = buffer.getInt(currentPosition); - - // If the length is non-zero, read the entry. - while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) { - - // Read the checksum of the entry. - final long checksum = buffer.getInt(currentPosition + Integer.BYTES); - - // Slice off the entry's bytes - final var entryBytes = buffer.slice(currentPosition + SegmentEntry.HEADER_BYTES, length); - - // Compute the checksum for the entry bytes. - final var crc32 = new CRC32(); - crc32.update(entryBytes); - - // If the stored checksum does not equal the computed checksum, do not proceed further - if (checksum != (int) crc32.getValue()) { - break; - } - - entryBytes.rewind(); - final E entry = namespace.deserialize(entryBytes); - lastEntry = new Indexed<>(nextIndex, entry, length); - this.index.index(nextIndex, currentPosition); - nextIndex++; - - // Update the current position for indexing. - currentPosition = currentPosition + SegmentEntry.HEADER_BYTES + length; - - if (currentPosition + SegmentEntry.HEADER_BYTES >= maxSegmentSize) { - break; - } - length = buffer.getInt(currentPosition); - } + return new DiskJournalSegmentWriter<>(this); } @Override - Indexed getLastEntry() { - return lastEntry; + JournalSegmentReader reader() { + return reader; } @Override -- 2.36.6