From ece1376a462788f613f27ac5b099f364896de773 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 5 May 2024 23:24:58 +0200 Subject: [PATCH 01/16] Retain RandomAccessFile in JournalSegmentFile Having a RandomAccessFile is nice, as it internally holds a FileChannel. This makes our files stateful -- which is okay, as we still manage their lifecycle via JournalSegment. JIRA: CONTROLLER-2099 Change-Id: Id8305c74dbd881eaf52d84191c11bb4ea2bc164b Signed-off-by: Robert Varga --- .../storage/journal/DiskFileReader.java | 9 ++--- .../storage/journal/DiskFileWriter.java | 10 +++-- .../io/atomix/storage/journal/FileReader.java | 2 +- .../io/atomix/storage/journal/FileWriter.java | 5 +-- .../storage/journal/JournalSegment.java | 31 ++------------ .../journal/JournalSegmentDescriptor.java | 23 +++++------ .../storage/journal/JournalSegmentFile.java | 40 ++++++++++++++++--- .../storage/journal/MappedFileWriter.java | 8 ++-- .../storage/journal/SegmentedJournal.java | 8 +++- 9 files changed, 70 insertions(+), 66 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java index 697604e600..a934a90308 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java @@ -16,7 +16,6 @@ package io.atomix.storage.journal; import static com.google.common.base.Verify.verify; -import static java.util.Objects.requireNonNull; import java.io.IOException; import java.nio.ByteBuffer; @@ -38,14 +37,14 @@ final class DiskFileReader extends FileReader { // tracks where memory's first available byte maps to in terms of FileChannel.position() private int bufferPosition; - DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { - this(file, channel, allocateBuffer(file.maxSize(), maxEntrySize)); + DiskFileReader(final JournalSegmentFile file, final int maxEntrySize) { + this(file, allocateBuffer(file.maxSize(), maxEntrySize)); } // Note: take ownership of the buffer - DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final ByteBuffer buffer) { + DiskFileReader(final JournalSegmentFile file, final ByteBuffer buffer) { super(file); - this.channel = requireNonNull(channel); + channel = file.channel(); this.buffer = buffer.flip(); bufferPosition = 0; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java index ffa11e819b..74fb2d8be8 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java @@ -29,12 +29,14 @@ final class DiskFileWriter extends FileWriter { private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]); private final DiskFileReader reader; + private final FileChannel channel; private final ByteBuffer buffer; - DiskFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { - super(file, channel, maxEntrySize); + DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize) { + super(file, maxEntrySize); + channel = file.channel(); buffer = DiskFileReader.allocateBuffer(file.maxSize(), maxEntrySize); - reader = new DiskFileReader(file, channel, buffer); + reader = new DiskFileReader(file, buffer); } @Override @@ -50,7 +52,7 @@ final class DiskFileWriter extends FileWriter { @Override MappedFileWriter toMapped() { flush(); - return new MappedFileWriter(file, channel, maxEntrySize); + return new MappedFileWriter(file, maxEntrySize); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java index 0a9bb3ef11..e9f06a15fc 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileReader.java @@ -25,7 +25,7 @@ import org.eclipse.jdt.annotation.NonNull; * An abstraction over how to read a {@link JournalSegmentFile}. */ abstract sealed class FileReader permits DiskFileReader, MappedFileReader { - private final JournalSegmentFile file; + private final @NonNull JournalSegmentFile file; FileReader(final JournalSegmentFile file) { this.file = requireNonNull(file); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java index 3e566fe90b..a677fc3d26 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/FileWriter.java @@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; import org.eclipse.jdt.annotation.Nullable; /** @@ -28,12 +27,10 @@ import org.eclipse.jdt.annotation.Nullable; */ abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter { final JournalSegmentFile file; - final FileChannel channel; final int maxEntrySize; - FileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { + FileWriter(final JournalSegmentFile file, final int maxEntrySize) { this.file = requireNonNull(file); - this.channel = requireNonNull(channel); this.maxEntrySize = maxEntrySize; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index a7f4c5aadc..b73d942c03 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -23,9 +23,7 @@ import io.atomix.storage.journal.index.JournalIndex; import io.atomix.storage.journal.index.Position; import io.atomix.storage.journal.index.SparseJournalIndex; import java.io.IOException; -import java.nio.channels.FileChannel; import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -47,7 +45,6 @@ final class JournalSegment { private final JournalIndex journalIndex; private final Set readers = ConcurrentHashMap.newKeySet(); private final AtomicInteger references = new AtomicInteger(); - private final FileChannel channel; private JournalSegmentWriter writer; private boolean open = true; @@ -61,16 +58,10 @@ final class JournalSegment { this.storageLevel = requireNonNull(storageLevel); this.maxEntrySize = maxEntrySize; journalIndex = new SparseJournalIndex(indexDensity); - try { - channel = FileChannel.open(file.path(), - StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); - } catch (IOException e) { - throw new StorageException(e); - } final var fileWriter = switch (storageLevel) { - case DISK -> new DiskFileWriter(file, channel, maxEntrySize); - case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize); + case DISK -> new DiskFileWriter(file, maxEntrySize); + case MAPPED -> new MappedFileWriter(file, maxEntrySize); }; writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex) // relinquish mapped memory @@ -95,19 +86,6 @@ final class JournalSegment { return writer.getLastIndex(); } - /** - * Returns the size of the segment. - * - * @return the size of the segment - */ - int size() { - try { - return (int) channel.size(); - } catch (IOException e) { - throw new StorageException(e); - } - } - /** * Returns the segment file. * @@ -179,8 +157,7 @@ final class JournalSegment { acquire(); final var buffer = writer.buffer(); - final var fileReader = buffer != null ? new MappedFileReader(file, buffer) - : new DiskFileReader(file, channel, maxEntrySize); + final var fileReader = buffer != null ? new MappedFileReader(file, buffer) : new DiskFileReader(file, maxEntrySize); final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize); reader.setPosition(JournalSegmentDescriptor.BYTES); readers.add(reader); @@ -235,7 +212,7 @@ final class JournalSegment { private void finishClose() { writer.close(); try { - channel.close(); + file.close(); } catch (IOException e) { throw new StorageException(e); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java index 24652f003b..97dbab72ef 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentDescriptor.java @@ -18,9 +18,7 @@ package io.atomix.storage.journal; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; +import java.nio.channels.ReadableByteChannel; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; @@ -69,23 +67,20 @@ public record JournalSegmentDescriptor( static final int VERSION = 1; /** - * Read a JournalSegmentDescriptor from a {@link Path}. + * Read a JournalSegmentDescriptor from a {@link ReadableByteChannel}. * - * @param path path to read from + * @param channel channel to read from * @return A {@link JournalSegmentDescriptor} * @throws IOException if an I/O error occurs or there is not enough data */ - public static @NonNull JournalSegmentDescriptor readFrom(final Path path) throws IOException { - final byte[] bytes; - try (var is = Files.newInputStream(path, StandardOpenOption.READ)) { - bytes = is.readNBytes(BYTES); + public static @NonNull JournalSegmentDescriptor readFrom(final ReadableByteChannel channel) throws IOException { + final var buffer = ByteBuffer.allocate(BYTES); + final var read = channel.read(buffer); + if (read != BYTES) { + throw new IOException("Need " + BYTES + " bytes, only " + read + " available"); } - if (bytes.length != BYTES) { - throw new IOException("Need " + BYTES + " bytes, only " + bytes.length + " available"); - } - - final var buffer = ByteBuffer.wrap(bytes); + buffer.flip(); return new JournalSegmentDescriptor( buffer.getInt(), buffer.getLong(), diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java index a7ab481dbb..825baa27a2 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java @@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.nio.file.Path; import org.eclipse.jdt.annotation.NonNull; @@ -37,25 +38,40 @@ final class JournalSegmentFile { private final @NonNull JournalSegmentDescriptor descriptor; private final @NonNull Path path; - private JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor) { + private final RandomAccessFile file; + + private JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor, + final RandomAccessFile file) { this.path = requireNonNull(path); this.descriptor = requireNonNull(descriptor); + this.file = requireNonNull(file); } static @NonNull JournalSegmentFile createNew(final String name, final File directory, final JournalSegmentDescriptor descriptor) throws IOException { final var file = createSegmentFile(name, directory, descriptor.id()); - try (var raf = new RandomAccessFile(file, "rw")) { + final var raf = new RandomAccessFile(file, "rw"); + try { raf.setLength(descriptor.maxSegmentSize()); raf.write(descriptor.toArray()); + } catch (IOException e) { + raf.close(); + throw e; } - return new JournalSegmentFile(file.toPath(), descriptor); + return new JournalSegmentFile(file.toPath(), descriptor, raf); } static @NonNull JournalSegmentFile openExisting(final Path path) throws IOException { - // read the descriptor - final var descriptor = JournalSegmentDescriptor.readFrom(path); - return new JournalSegmentFile(path, descriptor); + final var raf = new RandomAccessFile(path.toFile(), "rw"); + final JournalSegmentDescriptor descriptor; + try { + // read the descriptor + descriptor = JournalSegmentDescriptor.readFrom(raf.getChannel()); + } catch (IOException e) { + raf.close(); + throw e; + } + return new JournalSegmentFile(path, descriptor, raf); } /** @@ -80,6 +96,18 @@ final class JournalSegmentFile { return descriptor.maxSegmentSize(); } + int size() throws IOException { + return (int) file.length(); + } + + FileChannel channel() { + return file.getChannel(); + } + + void close() throws IOException { + file.close(); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString(); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java index 0849cffd0b..f91cdc827a 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java @@ -30,10 +30,10 @@ final class MappedFileWriter extends FileWriter { private final MappedFileReader reader; private final ByteBuffer buffer; - MappedFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) { - super(file, channel, maxEntrySize); + MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize) { + super(file, maxEntrySize); - mappedBuffer = mapBuffer(channel, file.maxSize()); + mappedBuffer = mapBuffer(file.channel(), file.maxSize()); buffer = mappedBuffer.slice(); reader = new MappedFileReader(file, mappedBuffer); } @@ -64,7 +64,7 @@ final class MappedFileWriter extends FileWriter { @Override DiskFileWriter toDisk() { close(); - return new DiskFileWriter(file, channel, maxEntrySize); + return new DiskFileWriter(file, maxEntrySize); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 7e821277ce..23a5419b83 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -191,7 +191,13 @@ public final class SegmentedJournal implements Journal { */ public long size() { return segments.values().stream() - .mapToLong(JournalSegment::size) + .mapToLong(segment -> { + try { + return segment.file().size(); + } catch (IOException e) { + throw new StorageException(e); + } + }) .sum(); } -- 2.36.6 From 91f69cce3299f17c524fd1af560cdaf2ae9ec337 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 00:48:01 +0200 Subject: [PATCH 02/16] Do not expose descriptor from JournalSegmentFile JournalSegmentFile should expose interesting mapping -- the descriptor is really an internal thing. JIRA: CONTROLLER-2099 Change-Id: I2276cee3c4cb15d961aaa1b7f72d618f10dd744e Signed-off-by: Robert Varga --- .../storage/journal/JournalSegment.java | 11 ++++---- .../storage/journal/JournalSegmentFile.java | 26 ++++++++++++++++--- .../storage/journal/SegmentedJournal.java | 4 +-- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index b73d942c03..2128b87e20 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -74,7 +74,7 @@ final class JournalSegment { * @return The segment's starting index. */ long firstIndex() { - return file.descriptor().index(); + return file.firstIndex(); } /** @@ -233,11 +233,10 @@ final class JournalSegment { @Override public String toString() { - final var descriptor = file.descriptor(); return MoreObjects.toStringHelper(this) - .add("id", descriptor.id()) - .add("version", descriptor.version()) - .add("index", descriptor.index()) - .toString(); + .add("id", file.segmentId()) + .add("version", file.version()) + .add("index", file.firstIndex()) + .toString(); } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java index 825baa27a2..04e22a1659 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java @@ -84,12 +84,30 @@ final class JournalSegmentFile { } /** - * Returns the segment descriptor. + * Returns the segment version. * - * @return The segment descriptor. + * @return the segment version */ - @NonNull JournalSegmentDescriptor descriptor() { - return descriptor; + int version() { + return descriptor.version(); + } + + /** + * Returns the segment identifier. + * + * @return the segment identifier + */ + long segmentId() { + return descriptor.id(); + } + + /** + * Returns the index of first entry stored in this file. + * + * @return the index of first entry stored in this file + */ + long firstIndex() { + return descriptor.index(); } int maxSize() { diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 23a5419b83..1ae77fa351 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -342,7 +342,7 @@ public final class SegmentedJournal implements Journal { final var index = currentSegment.lastIndex() + 1; final var lastSegment = getLastSegment(); - currentSegment = createSegment(lastSegment != null ? lastSegment.file().descriptor().id() + 1 : 1, index); + currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index); segments.put(index, currentSegment); return currentSegment; } @@ -436,7 +436,7 @@ public final class SegmentedJournal implements Journal { } // Load the segment. - LOG.debug("Loaded disk segment: {} ({})", segmentFile.descriptor().id(), segmentFile.path()); + LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path()); // Add the segment to the segments list. final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity); -- 2.36.6 From b2f070be60f9c49f74ec4fc0198460fb461c180a Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 01:22:47 +0200 Subject: [PATCH 03/16] Centralize CRC32 computation We have essentially-duplicate codepaths between JournalSegment{Reader,Writer}. Move computation to SegmentEntry. JIRA: CONTROLLER-2115 Change-Id: I776b693b6e88d84ddb99c274371ac694aa536d1d Signed-off-by: Ruslan Kashapov Signed-off-by: Robert Varga --- .../storage/journal/JournalSegmentReader.java | 12 +++--------- .../storage/journal/JournalSegmentWriter.java | 11 ++++------- .../atomix/storage/journal/SegmentEntry.java | 18 ++++++++++++++++-- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java index 5258f4323c..bba89dfdc9 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -20,7 +20,6 @@ import static java.util.Objects.requireNonNull; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import java.util.zip.CRC32; import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,12 +100,8 @@ final class JournalSegmentReader { // Slice off the entry's bytes final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length); - // Compute the checksum for the entry bytes. - final var crc32 = new CRC32(); - crc32.update(entryBuffer); - // If the stored checksum does not equal the computed checksum, do not proceed further - final var computed = (int) crc32.getValue(); + final var computed = SegmentEntry.computeChecksum(entryBuffer); if (checksum != computed) { LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed)); invalidateCache(); @@ -116,9 +111,8 @@ final class JournalSegmentReader { // update position position += SegmentEntry.HEADER_BYTES + length; - // return bytes - entryBuffer.rewind(); - return Unpooled.buffer(length).writeBytes(entryBuffer); + // rewind and return + return Unpooled.buffer(length).writeBytes(entryBuffer.rewind()); } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index 317e8fd45b..63f5303ecc 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -21,7 +21,6 @@ import static java.util.Objects.requireNonNull; import io.atomix.storage.journal.index.JournalIndex; import io.netty.buffer.ByteBuf; import java.nio.MappedByteBuffer; -import java.util.zip.CRC32; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; @@ -103,16 +102,14 @@ final class JournalSegmentWriter { } // allocate buffer and write data - final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES); - writeBuffer.put(buf.nioBuffer()); + final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES); + writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length); // Compute the checksum for the entry. - final var crc32 = new CRC32(); - crc32.update(writeBuffer.flip().position(HEADER_BYTES)); + final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length)); // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer. - writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue()); - fileWriter.commitWrite(position, writeBuffer.rewind()); + fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum)); // Update the last entry with the correct index/term/length. currentPosition = nextPosition; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java index be6c6ba831..432f9b9dec 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentEntry.java @@ -16,14 +16,16 @@ package io.atomix.storage.journal; import java.nio.ByteBuffer; +import java.util.zip.CRC32; +import org.eclipse.jdt.annotation.NonNull; /** * An {@link Indexed} entry read from {@link JournalSegment}. * - * @param checksum The CRC32 checksum of data + * @param checksum The {@link CRC32} checksum of data * @param bytes Entry bytes */ -record SegmentEntry(int checksum, ByteBuffer bytes) { +record SegmentEntry(int checksum, @NonNull ByteBuffer bytes) { /** * The size of the header, comprising of: *
    @@ -38,4 +40,16 @@ record SegmentEntry(int checksum, ByteBuffer bytes) { throw new IllegalArgumentException("Invalid entry bytes " + bytes); } } + + /** + * Compute the {@link CRC32} checksum of a buffer. Note that the buffer will be consumed during this process. + * + * @param bytes buffer to checksum + * @return the checksum + */ + static int computeChecksum(final ByteBuffer bytes) { + final var crc32 = new CRC32(); + crc32.update(bytes); + return (int) crc32.getValue(); + } } -- 2.36.6 From 587306204aa99b626f00a77b073bdd60b2eacb78 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 01:49:50 +0200 Subject: [PATCH 04/16] Add SparseJournalIndex.toString() Improve developer experience by noting positions. JIRA: CONTROLLER-2115 Change-Id: I1c8d9618ceec6775b22b3bd129ecbc9be8a412d1 Signed-off-by: Ruslan Kashapov Signed-off-by: Robert Varga --- .../io/atomix/storage/journal/index/SparseJournalIndex.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/index/SparseJournalIndex.java b/atomix-storage/src/main/java/io/atomix/storage/journal/index/SparseJournalIndex.java index 2b317362c5..6da3ba4ed4 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/index/SparseJournalIndex.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/index/SparseJournalIndex.java @@ -16,6 +16,7 @@ */ package io.atomix.storage.journal.index; +import com.google.common.base.MoreObjects; import java.util.TreeMap; /** @@ -52,4 +53,9 @@ public final class SparseJournalIndex implements JournalIndex { positions.tailMap(index, false).clear(); return Position.ofNullable(positions.lastEntry()); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("positions", positions).toString(); + } } -- 2.36.6 From b4faeefcc97ea4dc680f72587ec5ed617ee85746 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 02:09:32 +0200 Subject: [PATCH 05/16] Centralize IO buffer allocation JournalSegmentFile.maxSize() goes into the algorithm to choose I/O size. Let's move the method there. JIRA: CONTROLLER-2115 Change-Id: I9f0810cb9769217965375ee70c7e3e231adbf1d9 Signed-off-by: Ruslan Kashapov Signed-off-by: Robert Varga --- .../storage/journal/DiskFileReader.java | 22 +------------------ .../storage/journal/DiskFileWriter.java | 2 +- .../storage/journal/JournalSegmentFile.java | 21 ++++++++++++++++++ 3 files changed, 23 insertions(+), 22 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java index a934a90308..55f95f4d51 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileReader.java @@ -26,11 +26,6 @@ import org.eclipse.jdt.annotation.NonNull; * A {@link StorageLevel#DISK} implementation of {@link FileReader}. Maintains an internal buffer. */ final class DiskFileReader extends FileReader { - /** - * Just do not bother with IO smaller than this many bytes. - */ - private static final int MIN_IO_SIZE = 8192; - private final FileChannel channel; private final ByteBuffer buffer; @@ -38,7 +33,7 @@ final class DiskFileReader extends FileReader { private int bufferPosition; DiskFileReader(final JournalSegmentFile file, final int maxEntrySize) { - this(file, allocateBuffer(file.maxSize(), maxEntrySize)); + this(file, file.allocateBuffer(maxEntrySize)); } // Note: take ownership of the buffer @@ -49,21 +44,6 @@ final class DiskFileReader extends FileReader { bufferPosition = 0; } - static ByteBuffer allocateBuffer(final int maxSegmentSize, final int maxEntrySize) { - return ByteBuffer.allocate(chooseBufferSize(maxSegmentSize, maxEntrySize)); - } - - private static int chooseBufferSize(final int maxSegmentSize, final int maxEntrySize) { - if (maxSegmentSize <= MIN_IO_SIZE) { - // just buffer the entire segment - return maxSegmentSize; - } - - // one full entry plus its header, or MIN_IO_SIZE, which benefits the read of many small entries - final int minBufferSize = maxEntrySize + SegmentEntry.HEADER_BYTES; - return minBufferSize <= MIN_IO_SIZE ? MIN_IO_SIZE : minBufferSize; - } - @Override void invalidateCache() { buffer.clear().flip(); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java index 74fb2d8be8..0cc6454c2b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/DiskFileWriter.java @@ -35,7 +35,7 @@ final class DiskFileWriter extends FileWriter { DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize) { super(file, maxEntrySize); channel = file.channel(); - buffer = DiskFileReader.allocateBuffer(file.maxSize(), maxEntrySize); + buffer = file.allocateBuffer(maxEntrySize); reader = new DiskFileReader(file, buffer); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java index 04e22a1659..d096b33fd8 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java @@ -21,6 +21,7 @@ import com.google.common.base.MoreObjects; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Path; import org.eclipse.jdt.annotation.NonNull; @@ -34,6 +35,10 @@ final class JournalSegmentFile { private static final char PART_SEPARATOR = '-'; private static final char EXTENSION_SEPARATOR = '.'; private static final String EXTENSION = "log"; + /** + * Just do not bother with IO smaller than this many bytes. + */ + private static final int MIN_IO_SIZE = 8192; private final @NonNull JournalSegmentDescriptor descriptor; private final @NonNull Path path; @@ -126,11 +131,27 @@ final class JournalSegmentFile { file.close(); } + ByteBuffer allocateBuffer(final int maxEntrySize) { + return ByteBuffer.allocate(chooseBufferSize(maxEntrySize)); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString(); } + private int chooseBufferSize(final int maxEntrySize) { + final int maxSegmentSize = maxSize(); + if (maxSegmentSize <= MIN_IO_SIZE) { + // just buffer the entire segment + return maxSegmentSize; + } + + // one full entry plus its header, or MIN_IO_SIZE, which benefits the read of many small entries + final int minBufferSize = maxEntrySize + SegmentEntry.HEADER_BYTES; + return minBufferSize <= MIN_IO_SIZE ? MIN_IO_SIZE : minBufferSize; + } + /** * Returns a boolean value indicating whether the given file appears to be a parsable segment file. * -- 2.36.6 From 27c1f8ccaf729f7a53d49f2afa519adff62f6f76 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 02:52:44 +0200 Subject: [PATCH 06/16] Add JournalSegmentFile.map() We already provide a hub for I/O operations. Mapping a file is one such operation. JIRA: CONTROLLER-2099 Change-Id: I4fba610eac00739691849454bb99f99796c24e9c Signed-off-by: Robert Varga --- .../atomix/storage/journal/JournalSegmentFile.java | 12 ++++++++++++ .../io/atomix/storage/journal/MappedFileWriter.java | 11 +++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java index d096b33fd8..134b5233c0 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentFile.java @@ -22,7 +22,9 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; import java.nio.file.Path; import org.eclipse.jdt.annotation.NonNull; @@ -127,6 +129,16 @@ final class JournalSegmentFile { return file.getChannel(); } + /** + * Map the contents of the file into memory. + * + * @return A {@link MappedByteBuffer} + * @throws IOException if an I/O error occurs + */ + @NonNull MappedByteBuffer map() throws IOException { + return channel().map(MapMode.READ_WRITE, 0, maxSize()); + } + void close() throws IOException { file.close(); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java index f91cdc827a..a8877fabf7 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/MappedFileWriter.java @@ -19,7 +19,6 @@ import io.netty.util.internal.PlatformDependent; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; import org.eclipse.jdt.annotation.NonNull; /** @@ -33,17 +32,13 @@ final class MappedFileWriter extends FileWriter { MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize) { super(file, maxEntrySize); - mappedBuffer = mapBuffer(file.channel(), file.maxSize()); - buffer = mappedBuffer.slice(); - reader = new MappedFileReader(file, mappedBuffer); - } - - private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) { try { - return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize); + mappedBuffer = file.map(); } catch (IOException e) { throw new StorageException(e); } + buffer = mappedBuffer.slice(); + reader = new MappedFileReader(file, mappedBuffer); } @Override -- 2.36.6 From 22a8145fad85a5ac5e9acff6eaf6b41345faa5ac Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 May 2024 19:54:02 +0200 Subject: [PATCH 07/16] Deprecate more of serdes The JournalSerdes is deprecated for removal, as are all the classes directly related to it. This patch applies the second part of that sentence. Change-Id: I0ddb7293dfef539526ecb46598db7e194bdaf8f1 Signed-off-by: Robert Varga --- .../main/java/io/atomix/utils/serializer/EntrySerializer.java | 1 + .../src/main/java/io/atomix/utils/serializer/KryoEntryInput.java | 1 + .../main/java/io/atomix/utils/serializer/KryoEntryOutput.java | 1 + .../main/java/io/atomix/utils/serializer/KryoJournalSerdes.java | 1 + .../io/atomix/utils/serializer/KryoJournalSerdesBuilder.java | 1 + .../src/main/java/io/atomix/utils/serializer/RegisteredType.java | 1 + .../test/java/io/atomix/storage/journal/AbstractJournalTest.java | 1 + .../src/test/java/io/atomix/storage/journal/TestEntrySerdes.java | 1 + 8 files changed, 8 insertions(+) diff --git a/atomix-storage/src/main/java/io/atomix/utils/serializer/EntrySerializer.java b/atomix-storage/src/main/java/io/atomix/utils/serializer/EntrySerializer.java index 0508f1eee5..f8355244b1 100644 --- a/atomix-storage/src/main/java/io/atomix/utils/serializer/EntrySerializer.java +++ b/atomix-storage/src/main/java/io/atomix/utils/serializer/EntrySerializer.java @@ -27,6 +27,7 @@ import com.google.common.base.MoreObjects; import io.atomix.storage.journal.JournalSerdes.EntrySerdes; import java.io.IOException; +@Deprecated(forRemoval = true, since="9.0.3") final class EntrySerializer extends Serializer { // Note: uses identity to create things in Kryo, hence we want an instance for every serdes we wrap private final JavaSerializer javaSerializer = new JavaSerializer(); diff --git a/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoEntryInput.java b/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoEntryInput.java index 2a98f16073..1c0e500928 100644 --- a/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoEntryInput.java +++ b/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoEntryInput.java @@ -24,6 +24,7 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer; import io.atomix.storage.journal.JournalSerdes.EntryInput; import java.io.IOException; +@Deprecated(forRemoval = true, since="9.0.3") final class KryoEntryInput implements EntryInput { private final Kryo kryo; private final Input input; diff --git a/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoEntryOutput.java b/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoEntryOutput.java index 90886dde03..5e91332906 100644 --- a/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoEntryOutput.java +++ b/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoEntryOutput.java @@ -24,6 +24,7 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer; import io.atomix.storage.journal.JournalSerdes.EntryOutput; import java.io.IOException; +@Deprecated(forRemoval = true, since="9.0.3") final class KryoEntryOutput implements EntryOutput { private final Kryo kryo; private final Output output; diff --git a/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdes.java b/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdes.java index 64f35389c1..7742f981c6 100644 --- a/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdes.java +++ b/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdes.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; /** * Pool of Kryo instances, with classes pre-registered. */ +@Deprecated(forRemoval = true, since="9.0.3") final class KryoJournalSerdes implements JournalSerdes, KryoFactory, KryoPool { /** * Default buffer size used for serialization. diff --git a/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdesBuilder.java b/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdesBuilder.java index a62d8b3293..0caf7eaca6 100644 --- a/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdesBuilder.java +++ b/atomix-storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdesBuilder.java @@ -25,6 +25,7 @@ import io.atomix.storage.journal.JournalSerdes.EntrySerdes; import java.util.ArrayList; import java.util.List; +@Deprecated(forRemoval = true, since="9.0.3") public final class KryoJournalSerdesBuilder implements Builder { private final List types = new ArrayList<>(); private ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); diff --git a/atomix-storage/src/main/java/io/atomix/utils/serializer/RegisteredType.java b/atomix-storage/src/main/java/io/atomix/utils/serializer/RegisteredType.java index 0a17c09bf2..79cccbe4e3 100644 --- a/atomix-storage/src/main/java/io/atomix/utils/serializer/RegisteredType.java +++ b/atomix-storage/src/main/java/io/atomix/utils/serializer/RegisteredType.java @@ -17,6 +17,7 @@ package io.atomix.utils.serializer; import static java.util.Objects.requireNonNull; +@Deprecated(forRemoval = true, since="9.0.3") record RegisteredType(EntrySerializer serializer, Class[] types) { RegisteredType { requireNonNull(serializer); diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java index 487c314141..d4bc43d9b2 100644 --- a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java +++ b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java @@ -45,6 +45,7 @@ import org.junit.runners.Parameterized; */ @RunWith(Parameterized.class) public abstract class AbstractJournalTest { + @Deprecated(forRemoval = true, since="9.0.3") private static final JournalSerdes NAMESPACE = JournalSerdes.builder() .register(new TestEntrySerdes(), TestEntry.class) .register(new ByteArraySerdes(), byte[].class) diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/TestEntrySerdes.java b/atomix-storage/src/test/java/io/atomix/storage/journal/TestEntrySerdes.java index 8b04539bd7..8ef4183261 100644 --- a/atomix-storage/src/test/java/io/atomix/storage/journal/TestEntrySerdes.java +++ b/atomix-storage/src/test/java/io/atomix/storage/journal/TestEntrySerdes.java @@ -20,6 +20,7 @@ import io.atomix.storage.journal.JournalSerdes.EntryOutput; import io.atomix.storage.journal.JournalSerdes.EntrySerdes; import java.io.IOException; +@Deprecated(forRemoval = true, since="9.0.3") final class TestEntrySerdes implements EntrySerdes { private static final ByteArraySerdes BA_SERIALIZER = new ByteArraySerdes(); -- 2.36.6 From 30ffcf38d4a9f164c54a63d1f0b997250c258ac3 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 May 2024 20:46:07 +0200 Subject: [PATCH 08/16] Deprecate ByteArraySerdes This is another serdes, it should be deprecated. Change-Id: I6eab2a136cb1bc3f1f4629561752d8abcb26db56 Signed-off-by: Robert Varga --- .../src/test/java/io/atomix/storage/journal/ByteArraySerdes.java | 1 + 1 file changed, 1 insertion(+) diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/ByteArraySerdes.java b/atomix-storage/src/test/java/io/atomix/storage/journal/ByteArraySerdes.java index 79ce9097a3..c8789b829f 100644 --- a/atomix-storage/src/test/java/io/atomix/storage/journal/ByteArraySerdes.java +++ b/atomix-storage/src/test/java/io/atomix/storage/journal/ByteArraySerdes.java @@ -20,6 +20,7 @@ import io.atomix.storage.journal.JournalSerdes.EntryOutput; import io.atomix.storage.journal.JournalSerdes.EntrySerdes; import java.io.IOException; +@Deprecated(forRemoval = true, since="9.0.3") final class ByteArraySerdes implements EntrySerdes { @Override public byte[] read(final EntryInput input) throws IOException { -- 2.36.6 From bc005c333cb76e64b48eac215f7f8b938f7a4142 Mon Sep 17 00:00:00 2001 From: Ruslan Kashapov Date: Mon, 22 Apr 2024 17:24:04 +0300 Subject: [PATCH 09/16] Separate byte-level atomic-storage access Byte level functionality was moved into *ByteJournal* artifacts and now can be accessed independently. SegmentedJournal is now acts as a type serialization layer on top of ByteJournal. // FIXME: refactor SegmentedJournal.Builder (in a subsequent patch?) JIRA: CONTROLLER-2115 Change-Id: I2e4941bda3af76f0cd59e8c545131af85c668010 Signed-off-by: Ruslan Kashapov Signed-off-by: Robert Varga --- .../storage/journal/ByteBufJournal.java | 51 + ...tJournalReader.java => ByteBufMapper.java} | 26 +- .../atomix/storage/journal/ByteBufReader.java | 80 ++ .../atomix/storage/journal/ByteBufWriter.java | 79 ++ .../atomix/storage/journal/JournalReader.java | 4 +- .../storage/journal/JournalSegmentReader.java | 5 +- .../storage/journal/JournalSegmentWriter.java | 2 +- .../atomix/storage/journal/JournalSerdes.java | 25 +- .../storage/journal/JournalSerializer.java | 48 - .../atomix/storage/journal/JournalWriter.java | 2 + .../journal/SegmentedByteBufJournal.java | 596 +++++++++++ .../journal/SegmentedByteBufReader.java | 149 +++ .../journal/SegmentedByteBufWriter.java | 110 ++ .../SegmentedCommitsByteBufReader.java | 24 + .../storage/journal/SegmentedJournal.java | 941 ++++-------------- .../journal/SegmentedJournalReader.java | 131 +-- .../journal/SegmentedJournalWriter.java | 106 +- 17 files changed, 1407 insertions(+), 972 deletions(-) create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java rename atomix-storage/src/main/java/io/atomix/storage/journal/{CommitsSegmentJournalReader.java => ByteBufMapper.java} (59%) create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java delete mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java new file mode 100644 index 0000000000..baaa6b0ba9 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * A journal of byte arrays. Provides the ability to write modify entries via {@link ByteBufWriter} and read them + * back via {@link ByteBufReader}. + */ +@NonNullByDefault +public interface ByteBufJournal extends AutoCloseable { + /** + * Returns the journal writer. + * + * @return The journal writer. + */ + ByteBufWriter writer(); + + /** + * Opens a new {@link ByteBufReader} reading all entries. + * + * @param index The index at which to start the reader. + * @return A new journal reader. + */ + ByteBufReader openReader(long index); + + /** + * Opens a new {@link ByteBufReader} reading only committed entries. + * + * @param index The index at which to start the reader. + * @return A new journal reader. + */ + ByteBufReader openCommitsReader(long index); + + @Override + void close(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java similarity index 59% rename from atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java rename to atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java index 767e67fa46..cabd48d8bd 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java @@ -15,19 +15,27 @@ */ package io.atomix.storage.journal; +import io.netty.buffer.ByteBuf; import org.eclipse.jdt.annotation.NonNullByDefault; /** - * A {@link JournalReader} traversing only committed entries. + * Support for serialization of {@link ByteBufJournal} entries. */ @NonNullByDefault -final class CommitsSegmentJournalReader extends SegmentedJournalReader { - CommitsSegmentJournalReader(final SegmentedJournal journal, final JournalSegment segment) { - super(journal, segment); - } +public interface ByteBufMapper { + /** + * Converts an object into a series of bytes in a {@link ByteBuf}. + * + * @param obj the object + * @return resulting buffer + */ + ByteBuf objectToBytes(T obj) ; - @Override - public T tryNext(final EntryMapper mapper) { - return getNextIndex() <= journal.getCommitIndex() ? super.tryNext(mapper) : null; - } + /** + * Converts the contents of a {@link ByteBuf} to an object. + * + * @param buf buffer to convert + * @return resulting object + */ + T bytesToObject(ByteBuf buf); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java new file mode 100644 index 0000000000..1ebe81eef6 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import io.netty.buffer.ByteBuf; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; + +/** + * A reader of {@link ByteBufJournal} entries. + */ +@NonNullByDefault +public interface ByteBufReader extends AutoCloseable { + /** + * A journal entry processor. Responsible for transforming bytes into their internal representation. + * + * @param Internal representation type + */ + @FunctionalInterface + interface EntryMapper { + /** + * Process an entry. + * + * @param index entry index + * @param bytes entry bytes + * @return resulting internal representation + */ + T mapEntry(long index, ByteBuf bytes); + } + + /** + * Returns the first index in the journal. + * + * @return The first index in the journal + */ + long firstIndex(); + + /** + * Returns the next reader index. + * + * @return The next reader index + */ + long nextIndex(); + + /** + * Try to move to the next binary data block + * + * @param entryMapper callback to be invoked on binary data + * @return processed binary data, or {@code null} + */ + @Nullable T tryNext(EntryMapper entryMapper); + + /** + * Resets the reader to the start. + */ + void reset(); + + /** + * Resets the reader to the given index. + * + * @param index The index to which to reset the reader + */ + void reset(long index); + + @Override + void close(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java new file mode 100644 index 0000000000..7211a8844d --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import io.netty.buffer.ByteBuf; +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * A writer of {@link ByteBufJournal} entries. + */ +@NonNullByDefault +public interface ByteBufWriter { + /** + * Returns the last written index. + * + * @return The last written index + */ + long lastIndex(); + + /** + * Returns the next index to be written. + * + * @return The next index to be written + */ + long nextIndex(); + + /** + * Appends an entry to the journal. + * + * @param bytes Data block to append + * @return The index of appended data block + */ + // FIXME: throws IOException + long append(ByteBuf bytes); + + /** + * Commits entries up to the given index. + * + * @param index The index up to which to commit entries. + */ + void commit(long index); + + /** + * Resets the head of the journal to the given index. + * + * @param index The index to which to reset the head of the journal + */ + // FIXME: reconcile with reader's reset and truncate() + // FIXME: throws IOException + void reset(long index); + + /** + * Truncates the log to the given index. + * + * @param index The index to which to truncate the log. + */ + // FIXME: reconcile with reset() + // FIXME: throws IOException + void truncate(long index); + + /** + * Flushes written entries to disk. + */ + // FIXME: throws IOException + void flush(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java index a3c6ea5366..635f6248c4 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java @@ -75,10 +75,10 @@ public interface JournalReader extends AutoCloseable { /** * Try to move to the next entry. * - * @param mapper callback to be invoked for the entry + * @param entryMapper callback to be invoked for the entry * @return processed entry, or {@code null} */ - @Nullable T tryNext(EntryMapper mapper); + @Nullable T tryNext(EntryMapper entryMapper); /** * Resets the reader to the start. diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java index bba89dfdc9..aa4c0da18a 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -72,10 +72,9 @@ final class JournalSegmentReader { /** * Reads the next binary data block * - * @param index entry index * @return The binary data, or {@code null} */ - @Nullable ByteBuf readBytes(final long index) { + @Nullable ByteBuf readBytes() { // Check if there is enough in the buffer remaining final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES; if (remaining < 0) { @@ -112,7 +111,7 @@ final class JournalSegmentReader { position += SegmentEntry.HEADER_BYTES + length; // rewind and return - return Unpooled.buffer(length).writeBytes(entryBuffer.rewind()); + return Unpooled.wrappedBuffer(entryBuffer.rewind()); } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index 63f5303ecc..dbf6aec214 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -144,7 +144,7 @@ final class JournalSegmentWriter { reader.setPosition(JournalSegmentDescriptor.BYTES); while (index == 0 || nextIndex <= index) { - final var buf = reader.readBytes(nextIndex); + final var buf = reader.readBytes(); if (buf == null) { break; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java index a970882edf..ffdc985827 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java @@ -19,6 +19,9 @@ package io.atomix.storage.journal; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; import io.atomix.utils.serializer.KryoJournalSerdesBuilder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -27,7 +30,7 @@ import java.nio.ByteBuffer; /** * Support for serialization of {@link Journal} entries. * - * @deprecated due to dependency on outdated Kryo library, {@link JournalSerializer} to be used instead. + * @deprecated due to dependency on outdated Kryo library, {@link ByteBufMapper} to be used instead. */ @Deprecated(forRemoval = true, since="9.0.3") public interface JournalSerdes { @@ -110,6 +113,26 @@ public interface JournalSerdes { */ T deserialize(final InputStream stream, final int bufferSize); + /** + * Returns a {@link ByteBufMapper} backed by this object. + * + * @return a {@link ByteBufMapper} backed by this object + */ + default ByteBufMapper toMapper() { + return new ByteBufMapper<>() { + @Override + public ByteBuf objectToBytes(final T obj) { + return Unpooled.wrappedBuffer(serialize(obj)); + } + + @Override + public T bytesToObject(final ByteBuf buf) { + // FIXME: ByteBufUtil creates a copy -- we do not want to do that! + return deserialize(ByteBufUtil.getBytes(buf)); + } + }; + } + /** * Creates a new {@link JournalSerdes} builder. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java deleted file mode 100644 index eff9af8559..0000000000 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2024 PANTHEON.tech s.r.o. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package io.atomix.storage.journal; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; - -/** - * Support for serialization of {@link Journal} entries. - */ -public interface JournalSerializer { - - /** - * Serializes given object to byte array. - * - * @param obj Object to serialize - * @return serialized bytes as {@link ByteBuf} - */ - ByteBuf serialize(T obj) ; - - /** - * Deserializes given byte array to Object. - * - * @param buf serialized bytes as {@link ByteBuf} - * @return deserialized Object - */ - T deserialize(final ByteBuf buf); - - static JournalSerializer wrap(final JournalSerdes serdes) { - return new JournalSerializer<>() { - @Override - public ByteBuf serialize(final E obj) { - return Unpooled.wrappedBuffer(serdes.serialize(obj)); - } - - @Override - public E deserialize(final ByteBuf buf) { - return serdes.deserialize(ByteBufUtil.getBytes(buf)); - } - }; - } -} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java index 064fd019ec..ba7c5821aa 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java @@ -57,6 +57,7 @@ public interface JournalWriter { * * @param index the index to which to reset the head of the journal */ + // FIXME: reconcile with reader's reset and truncate() void reset(long index); /** @@ -64,6 +65,7 @@ public interface JournalWriter { * * @param index The index to which to truncate the log. */ + // FIXME: reconcile with reset() void truncate(long index); /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java new file mode 100644 index 0000000000..3ae64ea82e --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -0,0 +1,596 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.function.BiFunction; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link ByteBufJournal} Implementation. + */ +public final class SegmentedByteBufJournal implements ByteBufJournal { + private static final Logger LOG = LoggerFactory.getLogger(SegmentedByteBufJournal.class); + private static final int SEGMENT_BUFFER_FACTOR = 3; + + private final ConcurrentNavigableMap segments = new ConcurrentSkipListMap<>(); + private final Collection readers = ConcurrentHashMap.newKeySet(); + private final String name; + private final StorageLevel storageLevel; + private final File directory; + private final int maxSegmentSize; + private final int maxEntrySize; + private final double indexDensity; + private final boolean flushOnCommit; + private final @NonNull ByteBufWriter writer; + + private JournalSegment currentSegment; + private volatile long commitIndex; + + public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory, + final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) { + this.name = requireNonNull(name, "name cannot be null"); + this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); + this.directory = requireNonNull(directory, "directory cannot be null"); + this.maxSegmentSize = maxSegmentSize; + this.maxEntrySize = maxEntrySize; + this.indexDensity = indexDensity; + this.flushOnCommit = flushOnCommit; + open(); + writer = new SegmentedByteBufWriter(this); + } + + /** + * Returns the total size of the journal. + * + * @return the total size of the journal + */ + public long size() { + return segments.values().stream() + .mapToLong(segment -> { + try { + return segment.file().size(); + } catch (IOException e) { + throw new StorageException(e); + } + }) + .sum(); + } + + @Override + public ByteBufWriter writer() { + return writer; + } + + @Override + public ByteBufReader openReader(final long index) { + return openReader(index, SegmentedByteBufReader::new); + } + + @NonNullByDefault + private ByteBufReader openReader(final long index, + final BiFunction constructor) { + final var reader = constructor.apply(this, segment(index)); + reader.reset(index); + readers.add(reader); + return reader; + } + + @Override + public ByteBufReader openCommitsReader(final long index) { + return openReader(index, SegmentedCommitsByteBufReader::new); + } + + /** + * Opens the segments. + */ + private synchronized void open() { + // Load existing log segments from disk. + for (var segment : loadSegments()) { + segments.put(segment.firstIndex(), segment); + } + // If a segment doesn't already exist, create an initial segment starting at index 1. + if (segments.isEmpty()) { + currentSegment = createSegment(1, 1); + segments.put(1L, currentSegment); + } else { + currentSegment = segments.lastEntry().getValue(); + } + } + + /** + * Asserts that the manager is open. + * + * @throws IllegalStateException if the segment manager is not open + */ + private void assertOpen() { + checkState(currentSegment != null, "journal not open"); + } + + /** + * Asserts that enough disk space is available to allocate a new segment. + */ + private void assertDiskSpace() { + if (directory.getUsableSpace() < maxSegmentSize * SEGMENT_BUFFER_FACTOR) { + throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment"); + } + } + + /** + * Resets the current segment, creating a new segment if necessary. + */ + private synchronized void resetCurrentSegment() { + final var lastSegment = lastSegment(); + if (lastSegment == null) { + currentSegment = createSegment(1, 1); + segments.put(1L, currentSegment); + } else { + currentSegment = lastSegment; + } + } + + /** + * Resets and returns the first segment in the journal. + * + * @param index the starting index of the journal + * @return the first segment + */ + JournalSegment resetSegments(final long index) { + assertOpen(); + + // If the index already equals the first segment index, skip the reset. + final var firstSegment = firstSegment(); + if (index == firstSegment.firstIndex()) { + return firstSegment; + } + + segments.values().forEach(JournalSegment::delete); + segments.clear(); + + currentSegment = createSegment(1, index); + segments.put(index, currentSegment); + return currentSegment; + } + + /** + * Returns the first segment in the log. + * + * @throws IllegalStateException if the segment manager is not open + */ + JournalSegment firstSegment() { + assertOpen(); + final var firstEntry = segments.firstEntry(); + return firstEntry != null ? firstEntry.getValue() : nextSegment(); + } + + /** + * Returns the last segment in the log. + * + * @throws IllegalStateException if the segment manager is not open + */ + JournalSegment lastSegment() { + assertOpen(); + final var lastEntry = segments.lastEntry(); + return lastEntry != null ? lastEntry.getValue() : nextSegment(); + } + + /** + * Creates and returns the next segment. + * + * @return The next segment. + * @throws IllegalStateException if the segment manager is not open + */ + synchronized JournalSegment nextSegment() { + assertOpen(); + assertDiskSpace(); + + final var index = currentSegment.lastIndex() + 1; + final var lastSegment = lastSegment(); + currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index); + segments.put(index, currentSegment); + return currentSegment; + } + + /** + * Returns the segment following the segment with the given ID. + * + * @param index The segment index with which to look up the next segment. + * @return The next segment for the given index. + */ + JournalSegment nextSegment(final long index) { + final var higherEntry = segments.higherEntry(index); + return higherEntry != null ? higherEntry.getValue() : null; + } + + /** + * Returns the segment for the given index. + * + * @param index The index for which to return the segment. + * @throws IllegalStateException if the segment manager is not open + */ + synchronized JournalSegment segment(final long index) { + assertOpen(); + // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup. + if (currentSegment != null && index > currentSegment.firstIndex()) { + return currentSegment; + } + + // If the index is in another segment, get the entry with the next lowest first index. + final var segment = segments.floorEntry(index); + return segment != null ? segment.getValue() : firstSegment(); + } + + /** + * Removes a segment. + * + * @param segment The segment to remove. + */ + synchronized void removeSegment(final JournalSegment segment) { + segments.remove(segment.firstIndex()); + segment.delete(); + resetCurrentSegment(); + } + + /** + * Creates a new segment. + */ + JournalSegment createSegment(final long id, final long index) { + final JournalSegmentFile file; + try { + file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder() + .withId(id) + .withIndex(index) + .withMaxSegmentSize(maxSegmentSize) + // FIXME: propagate maxEntries + .withMaxEntries(Integer.MAX_VALUE) + .withUpdated(System.currentTimeMillis()) + .build()); + } catch (IOException e) { + throw new StorageException(e); + } + + final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity); + LOG.debug("Created segment: {}", segment); + return segment; + } + + /** + * Loads all segments from disk. + * + * @return A collection of segments for the log. + */ + protected Collection loadSegments() { + // Ensure log directories are created. + directory.mkdirs(); + + final var segmentsMap = new TreeMap(); + + // Iterate through all files in the log directory. + for (var file : directory.listFiles(File::isFile)) { + + // If the file looks like a segment file, attempt to load the segment. + if (JournalSegmentFile.isSegmentFile(name, file)) { + final JournalSegmentFile segmentFile; + try { + segmentFile = JournalSegmentFile.openExisting(file.toPath()); + } catch (IOException e) { + throw new StorageException(e); + } + + // Load the segment. + LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path()); + + // Add the segment to the segments list. + final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity); + segments.put(segment.firstIndex(), segment); + } + } + + // Verify that all the segments in the log align with one another. + JournalSegment previousSegment = null; + boolean corrupted = false; + for (var iterator = segmentsMap.entrySet().iterator(); iterator.hasNext(); ) { + final var segment = iterator.next().getValue(); + if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) { + LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(), + previousSegment.file().path()); + corrupted = true; + } + if (corrupted) { + segment.delete(); + iterator.remove(); + } + previousSegment = segment; + } + + return segmentsMap.values(); + } + + /** + * Resets journal readers to the given head. + * + * @param index The index at which to reset readers. + */ + void resetHead(final long index) { + for (var reader : readers) { + if (reader.nextIndex() < index) { + reader.reset(index); + } + } + } + + /** + * Resets journal readers to the given tail. + * + * @param index The index at which to reset readers. + */ + void resetTail(final long index) { + for (var reader : readers) { + if (reader.nextIndex() >= index) { + reader.reset(index); + } + } + } + + void closeReader(final SegmentedByteBufReader reader) { + readers.remove(reader); + } + + /** + * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index. + * + * @param index the index from which to remove segments + * @return indicates whether a segment can be removed from the journal + */ + public boolean isCompactable(final long index) { + final var segmentEntry = segments.floorEntry(index); + return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0; + } + + /** + * Returns the index of the last segment in the log. + * + * @param index the compaction index + * @return the starting index of the last segment in the log + */ + public long getCompactableIndex(final long index) { + final var segmentEntry = segments.floorEntry(index); + return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0; + } + + /** + * Compacts the journal up to the given index. + *

    + * The semantics of compaction are not specified by this interface. + * + * @param index The index up to which to compact the journal. + */ + public void compact(final long index) { + final var segmentEntry = segments.floorEntry(index); + if (segmentEntry != null) { + final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex()); + if (!compactSegments.isEmpty()) { + LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size()); + compactSegments.values().forEach(JournalSegment::delete); + compactSegments.clear(); + resetHead(segmentEntry.getValue().firstIndex()); + } + } + } + + @Override + public void close() { + if (currentSegment != null) { + currentSegment = null; + segments.values().forEach(JournalSegment::close); + segments.clear(); + } + } + + /** + * Returns whether {@code flushOnCommit} is enabled for the log. + * + * @return Indicates whether {@code flushOnCommit} is enabled for the log. + */ + boolean isFlushOnCommit() { + return flushOnCommit; + } + + /** + * Updates commit index to the given value. + * + * @param index The index value. + */ + void setCommitIndex(final long index) { + commitIndex = index; + } + + /** + * Returns the journal last commit index. + * + * @return The journal last commit index. + */ + long getCommitIndex() { + return commitIndex; + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Segmented byte journal builder. + */ + public static final class Builder { + private static final boolean DEFAULT_FLUSH_ON_COMMIT = false; + private static final String DEFAULT_NAME = "atomix"; + private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir"); + private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32; + private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024; + private static final double DEFAULT_INDEX_DENSITY = .005; + + private String name = DEFAULT_NAME; + private StorageLevel storageLevel = StorageLevel.DISK; + private File directory = new File(DEFAULT_DIRECTORY); + private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE; + private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE; + private double indexDensity = DEFAULT_INDEX_DENSITY; + private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT; + + private Builder() { + // on purpose + } + + /** + * Sets the journal name. + * + * @param name The journal name. + * @return The builder instance + */ + public Builder withName(final String name) { + this.name = requireNonNull(name, "name cannot be null"); + return this; + } + + /** + * Sets the storage level. + * + * @param storageLevel The storage level. + * @return The builder instance + */ + public Builder withStorageLevel(final StorageLevel storageLevel) { + this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); + return this; + } + + /** + * Sets the journal directory. + * + * @param directory The log directory. + * @return The builder instance + * @throws NullPointerException If the {@code directory} is {@code null} + */ + public Builder withDirectory(final String directory) { + return withDirectory(new File(requireNonNull(directory, "directory cannot be null"))); + } + + /** + * Sets the journal directory + * + * @param directory The log directory. + * @return The builder instance + * @throws NullPointerException If the {@code directory} is {@code null} + */ + public Builder withDirectory(final File directory) { + this.directory = requireNonNull(directory, "directory cannot be null"); + return this; + } + + /** + * Sets the maximum segment size in bytes. + * By default, the maximum segment size is {@code 1024 * 1024 * 32}. + * + * @param maxSegmentSize The maximum segment size in bytes. + * @return The builder instance + * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive + */ + public Builder withMaxSegmentSize(final int maxSegmentSize) { + checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, + "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES); + this.maxSegmentSize = maxSegmentSize; + return this; + } + + /** + * Sets the maximum entry size in bytes. + * + * @param maxEntrySize the maximum entry size in bytes + * @return the builder instance + * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive + */ + public Builder withMaxEntrySize(final int maxEntrySize) { + checkArgument(maxEntrySize > 0, "maxEntrySize must be positive"); + this.maxEntrySize = maxEntrySize; + return this; + } + + /** + * Sets the journal index density. + *

    + * The index density is the frequency at which the position of entries written to the journal will be + * recorded in an in-memory index for faster seeking. + * + * @param indexDensity the index density + * @return the builder instance + * @throws IllegalArgumentException if the density is not between 0 and 1 + */ + public Builder withIndexDensity(final double indexDensity) { + checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1"); + this.indexDensity = indexDensity; + return this; + } + + /** + * Enables flushing buffers to disk when entries are committed to a segment. + *

    + * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time + * an entry is committed in a given segment. + * + * @return The builder instance + */ + public Builder withFlushOnCommit() { + return withFlushOnCommit(true); + } + + /** + * Sets whether to flush buffers to disk when entries are committed to a segment. + *

    + * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time + * an entry is committed in a given segment. + * + * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment. + * @return The builder instance + */ + public Builder withFlushOnCommit(final boolean flushOnCommit) { + this.flushOnCommit = flushOnCommit; + return this; + } + + /** + * Build the {@link SegmentedByteBufJournal}. + * + * @return {@link SegmentedByteBufJournal} instance built. + */ + public SegmentedByteBufJournal build() { + return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize, + indexDensity, flushOnCommit); + } + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java new file mode 100644 index 0000000000..d164676845 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java @@ -0,0 +1,149 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static java.util.Objects.requireNonNull; + +import io.netty.buffer.ByteBuf; +import org.eclipse.jdt.annotation.NonNull; + +/** + * A {@link ByteBufReader} implementation. + */ +sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCommitsByteBufReader { + final @NonNull SegmentedByteBufJournal journal; + + private JournalSegment currentSegment; + private JournalSegmentReader currentReader; + private long nextIndex; + + SegmentedByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) { + this.journal = requireNonNull(journal); + currentSegment = requireNonNull(segment); + currentReader = segment.createReader(); + nextIndex = currentSegment.firstIndex(); + } + + @Override + public final long firstIndex() { + return journal.firstSegment().firstIndex(); + } + + @Override + public final long nextIndex() { + return nextIndex; + } + + @Override + public final void reset() { + currentReader.close(); + currentSegment = journal.firstSegment(); + currentReader = currentSegment.createReader(); + nextIndex = currentSegment.firstIndex(); + } + + @Override + public final void reset(final long index) { + // If the current segment is not open, it has been replaced. Reset the segments. + if (!currentSegment.isOpen()) { + reset(); + } + if (index < nextIndex) { + rewind(index); + } else if (index > nextIndex) { + forwardTo(index); + } else { + resetCurrentReader(index); + } + } + + private void resetCurrentReader(final long index) { + final var position = currentSegment.lookup(index - 1); + if (position != null) { + nextIndex = position.index(); + currentReader.setPosition(position.position()); + } else { + nextIndex = currentSegment.firstIndex(); + currentReader.setPosition(JournalSegmentDescriptor.BYTES); + } + forwardTo(index); + } + + /** + * Rewinds the journal to the given index. + */ + private void rewind(final long index) { + if (currentSegment.firstIndex() >= index) { + final var segment = journal.segment(index - 1); + if (segment != null) { + currentReader.close(); + currentSegment = segment; + currentReader = currentSegment.createReader(); + } + } + resetCurrentReader(index); + } + + private void forwardTo(final long index) { + while (nextIndex < index && tryAdvance(nextIndex) != null) { + // No-op -- nextIndex value is updated in tryAdvance() + } + } + + @Override + public final T tryNext(final EntryMapper entryMapper) { + final var index = nextIndex; + final var bytes = tryAdvance(index); + return bytes == null ? null : entryMapper.mapEntry(index, bytes); + } + + /** + * Attempt to read the next entry. {@code index} here is really {@code nextIndex} passed by callers, which already + * check it for invariants. If non-null is returned, {@code nextIndex} has already been set to {@code index + 1}. + * + *

    + * This method is shared between 'all entries' and 'committed entries only' variants. The distinction is made by + * an additional check in {@link SegmentedCommitsByteBufReader#tryAdvance(long)}. + * + * @param index next index + * @return Entry bytes, or {@code null} + */ + ByteBuf tryAdvance(final long index) { + var buf = currentReader.readBytes(); + if (buf == null) { + final var nextSegment = journal.nextSegment(currentSegment.firstIndex()); + if (nextSegment == null || nextSegment.firstIndex() != index) { + return null; + } + currentReader.close(); + currentSegment = nextSegment; + currentReader = currentSegment.createReader(); + buf = currentReader.readBytes(); + if (buf == null) { + return null; + } + } + nextIndex = index + 1; + return buf; + } + + @Override + public final void close() { + currentReader.close(); + journal.closeReader(this); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java new file mode 100644 index 0000000000..7e92815f34 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -0,0 +1,110 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import io.netty.buffer.ByteBuf; + +/** + * A {@link ByteBufWriter} implementation. + */ +final class SegmentedByteBufWriter implements ByteBufWriter { + private final SegmentedByteBufJournal journal; + + private JournalSegment currentSegment; + private JournalSegmentWriter currentWriter; + + SegmentedByteBufWriter(final SegmentedByteBufJournal journal) { + this.journal = requireNonNull(journal); + currentSegment = journal.lastSegment(); + currentWriter = currentSegment.acquireWriter(); + } + + @Override + public long lastIndex() { + return currentWriter.getLastIndex(); + } + + @Override + public long nextIndex() { + return currentWriter.getNextIndex(); + } + + @Override + public void reset(final long index) { + if (index > currentSegment.firstIndex()) { + currentSegment.releaseWriter(); + currentSegment = journal.resetSegments(index); + currentWriter = currentSegment.acquireWriter(); + } else { + truncate(index - 1); + } + journal.resetHead(index); + } + + @Override + public void commit(final long index) { + if (index > journal.getCommitIndex()) { + journal.setCommitIndex(index); + if (journal.isFlushOnCommit()) { + flush(); + } + } + } + + @Override + public long append(final ByteBuf buf) { + var index = currentWriter.append(buf); + if (index != null) { + return index; + } + // Slow path: we do not have enough capacity + currentWriter.flush(); + currentSegment.releaseWriter(); + currentSegment = journal.nextSegment(); + currentWriter = currentSegment.acquireWriter(); + return verifyNotNull(currentWriter.append(buf)); + } + + @Override + public void truncate(final long index) { + if (index < journal.getCommitIndex()) { + throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index); + } + + // Delete all segments with first indexes greater than the given index. + while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) { + currentSegment.releaseWriter(); + journal.removeSegment(currentSegment); + currentSegment = journal.lastSegment(); + currentWriter = currentSegment.acquireWriter(); + } + + // Truncate the current index. + currentWriter.truncate(index); + + // Reset segment readers. + journal.resetTail(index + 1); + } + + @Override + public void flush() { + currentWriter.flush(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java new file mode 100644 index 0000000000..43fae62a1e --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package io.atomix.storage.journal; + +import io.netty.buffer.ByteBuf; + +/** + * A {@link ByteBufReader} traversing only committed entries. + */ +final class SegmentedCommitsByteBufReader extends SegmentedByteBufReader { + SegmentedCommitsByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) { + super(journal, segment); + } + + @Override + ByteBuf tryAdvance(final long index) { + return index <= journal.getCommitIndex() ? super.tryAdvance(index) : null; + } +} \ No newline at end of file diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 1ae77fa351..cd07492692 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -1,5 +1,6 @@ /* * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,772 +16,258 @@ */ package io.atomix.storage.journal; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicBoolean; /** * Segmented journal. */ public final class SegmentedJournal implements Journal { - /** - * Returns a new Raft log builder. - * - * @return A new Raft log builder. - */ - public static Builder builder() { - return new Builder<>(); - } - - private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class); - private static final int SEGMENT_BUFFER_FACTOR = 3; - - private final String name; - private final StorageLevel storageLevel; - private final File directory; - private final JournalSerializer serializer; - private final int maxSegmentSize; - private final int maxEntrySize; - private final int maxEntriesPerSegment; - private final double indexDensity; - private final boolean flushOnCommit; - private final SegmentedJournalWriter writer; - private volatile long commitIndex; - - private final ConcurrentNavigableMap segments = new ConcurrentSkipListMap<>(); - private final Collection> readers = ConcurrentHashMap.newKeySet(); - private JournalSegment currentSegment; - - private volatile boolean open = true; - - public SegmentedJournal( - String name, - StorageLevel storageLevel, - File directory, - JournalSerdes namespace, - int maxSegmentSize, - int maxEntrySize, - int maxEntriesPerSegment, - double indexDensity, - boolean flushOnCommit) { - this.name = requireNonNull(name, "name cannot be null"); - this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); - this.directory = requireNonNull(directory, "directory cannot be null"); - this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null")); - this.maxSegmentSize = maxSegmentSize; - this.maxEntrySize = maxEntrySize; - this.maxEntriesPerSegment = maxEntriesPerSegment; - this.indexDensity = indexDensity; - this.flushOnCommit = flushOnCommit; - open(); - this.writer = new SegmentedJournalWriter<>(this); - } - - /** - * Returns the segment file name prefix. - * - * @return The segment file name prefix. - */ - public String name() { - return name; - } - - /** - * Returns the storage directory. - *

    - * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be - * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided - * when the log is opened. - * - * @return The storage directory. - */ - public File directory() { - return directory; - } - - /** - * Returns the storage level. - *

    - * The storage level dictates how entries within individual journal segments should be stored. - * - * @return The storage level. - */ - public StorageLevel storageLevel() { - return storageLevel; - } - - /** - * Returns the maximum journal segment size. - *

    - * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes. - * - * @return The maximum segment size in bytes. - */ - public int maxSegmentSize() { - return maxSegmentSize; - } - - /** - * Returns the maximum journal entry size. - *

    - * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes. - * - * @return the maximum entry size in bytes - */ - public int maxEntrySize() { - return maxEntrySize; - } - - /** - * Returns the maximum number of entries per segment. - *

    - * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment - * in a journal. - * - * @return The maximum number of entries per segment. - * @deprecated since 3.0.2 - */ - @Deprecated - public int maxEntriesPerSegment() { - return maxEntriesPerSegment; - } - - /** - * Returns the collection of journal segments. - * - * @return the collection of journal segments - */ - public Collection segments() { - return segments.values(); - } - - /** - * Returns the collection of journal segments with indexes greater than the given index. - * - * @param index the starting index - * @return the journal segments starting with indexes greater than or equal to the given index - */ - public Collection segments(long index) { - return segments.tailMap(index).values(); - } - - /** - * Returns serializer instance. - * - * @return serializer instance - */ - JournalSerializer serializer() { - return serializer; - } - - /** - * Returns the total size of the journal. - * - * @return the total size of the journal - */ - public long size() { - return segments.values().stream() - .mapToLong(segment -> { - try { - return segment.file().size(); - } catch (IOException e) { - throw new StorageException(e); - } - }) - .sum(); - } - - @Override - public JournalWriter writer() { - return writer; - } - - @Override - public JournalReader openReader(long index) { - return openReader(index, JournalReader.Mode.ALL); - } - - /** - * Opens a new Raft log reader with the given reader mode. - * - * @param index The index from which to begin reading entries. - * @param mode The mode in which to read entries. - * @return The Raft log reader. - */ - @Override - public JournalReader openReader(long index, JournalReader.Mode mode) { - final var segment = getSegment(index); - final var reader = switch (mode) { - case ALL -> new SegmentedJournalReader<>(this, segment); - case COMMITS -> new CommitsSegmentJournalReader<>(this, segment); - }; - - // Forward reader to specified index - long next = reader.getNextIndex(); - while (index > next && reader.tryAdvance()) { - next = reader.getNextIndex(); + private final AtomicBoolean open = new AtomicBoolean(true); + private final SegmentedByteBufJournal journal; + private final SegmentedJournalWriter writer; + private final ByteBufMapper mapper; + + public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper mapper) { + this.journal = requireNonNull(journal, "journal is required"); + this.mapper = requireNonNull(mapper, "mapper cannot be null"); + writer = new SegmentedJournalWriter<>(journal.writer(), mapper); } - readers.add(reader); - return reader; - } - - /** - * Opens the segments. - */ - private synchronized void open() { - // Load existing log segments from disk. - for (var segment : loadSegments()) { - segments.put(segment.firstIndex(), segment); + @Override + public JournalWriter writer() { + return writer; } - // If a segment doesn't already exist, create an initial segment starting at index 1. - if (!segments.isEmpty()) { - currentSegment = segments.lastEntry().getValue(); - } else { - currentSegment = createSegment(1, 1); - segments.put(1L, currentSegment); - } - } - - /** - * Asserts that the manager is open. - * - * @throws IllegalStateException if the segment manager is not open - */ - private void assertOpen() { - checkState(currentSegment != null, "journal not open"); - } - - /** - * Asserts that enough disk space is available to allocate a new segment. - */ - private void assertDiskSpace() { - if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) { - throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment"); - } - } - - /** - * Resets the current segment, creating a new segment if necessary. - */ - private synchronized void resetCurrentSegment() { - final var lastSegment = getLastSegment(); - if (lastSegment == null) { - currentSegment = createSegment(1, 1); - segments.put(1L, currentSegment); - } else { - currentSegment = lastSegment; - } - } - - /** - * Resets and returns the first segment in the journal. - * - * @param index the starting index of the journal - * @return the first segment - */ - JournalSegment resetSegments(long index) { - assertOpen(); - - // If the index already equals the first segment index, skip the reset. - final var firstSegment = getFirstSegment(); - if (index == firstSegment.firstIndex()) { - return firstSegment; - } - - segments.values().forEach(JournalSegment::delete); - segments.clear(); - - currentSegment = createSegment(1, index); - segments.put(index, currentSegment); - return currentSegment; - } - - /** - * Returns the first segment in the log. - * - * @throws IllegalStateException if the segment manager is not open - */ - JournalSegment getFirstSegment() { - assertOpen(); - Map.Entry segment = segments.firstEntry(); - return segment != null ? segment.getValue() : null; - } - - /** - * Returns the last segment in the log. - * - * @throws IllegalStateException if the segment manager is not open - */ - JournalSegment getLastSegment() { - assertOpen(); - Map.Entry segment = segments.lastEntry(); - return segment != null ? segment.getValue() : null; - } - - /** - * Creates and returns the next segment. - * - * @return The next segment. - * @throws IllegalStateException if the segment manager is not open - */ - synchronized JournalSegment getNextSegment() { - assertOpen(); - assertDiskSpace(); - - final var index = currentSegment.lastIndex() + 1; - final var lastSegment = getLastSegment(); - currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index); - segments.put(index, currentSegment); - return currentSegment; - } - - /** - * Returns the segment following the segment with the given ID. - * - * @param index The segment index with which to look up the next segment. - * @return The next segment for the given index. - */ - JournalSegment getNextSegment(long index) { - Map.Entry nextSegment = segments.higherEntry(index); - return nextSegment != null ? nextSegment.getValue() : null; - } - - /** - * Returns the segment for the given index. - * - * @param index The index for which to return the segment. - * @throws IllegalStateException if the segment manager is not open - */ - synchronized JournalSegment getSegment(long index) { - assertOpen(); - // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup. - if (currentSegment != null && index > currentSegment.firstIndex()) { - return currentSegment; - } - - // If the index is in another segment, get the entry with the next lowest first index. - Map.Entry segment = segments.floorEntry(index); - if (segment != null) { - return segment.getValue(); - } - return getFirstSegment(); - } - - /** - * Removes a segment. - * - * @param segment The segment to remove. - */ - synchronized void removeSegment(JournalSegment segment) { - segments.remove(segment.firstIndex()); - segment.delete(); - resetCurrentSegment(); - } - - /** - * Creates a new segment. - */ - JournalSegment createSegment(long id, long index) { - final JournalSegmentFile file; - try { - file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder() - .withId(id) - .withIndex(index) - .withMaxSegmentSize(maxSegmentSize) - .withMaxEntries(maxEntriesPerSegment) - .withUpdated(System.currentTimeMillis()) - .build()); - } catch (IOException e) { - throw new StorageException(e); - } - - final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity); - LOG.debug("Created segment: {}", segment); - return segment; - } - - /** - * Loads all segments from disk. - * - * @return A collection of segments for the log. - */ - protected Collection loadSegments() { - // Ensure log directories are created. - directory.mkdirs(); - - final var segments = new TreeMap(); - - // Iterate through all files in the log directory. - for (var file : directory.listFiles(File::isFile)) { - - // If the file looks like a segment file, attempt to load the segment. - if (JournalSegmentFile.isSegmentFile(name, file)) { - final JournalSegmentFile segmentFile; - try { - segmentFile = JournalSegmentFile.openExisting(file.toPath()); - } catch (IOException e) { - throw new StorageException(e); - } - - // Load the segment. - LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path()); - - // Add the segment to the segments list. - final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity); - segments.put(segment.firstIndex(), segment); - } - } - - // Verify that all the segments in the log align with one another. - JournalSegment previousSegment = null; - boolean corrupted = false; - final var iterator = segments.entrySet().iterator(); - while (iterator.hasNext()) { - final var segment = iterator.next().getValue(); - if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) { - LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(), - previousSegment.file().path()); - corrupted = true; - } - if (corrupted) { - segment.delete(); - iterator.remove(); - } - previousSegment = segment; - } - - return segments.values(); - } - - /** - * Resets journal readers to the given head. - * - * @param index The index at which to reset readers. - */ - void resetHead(long index) { - for (var reader : readers) { - if (reader.getNextIndex() < index) { - reader.reset(index); - } - } - } - - /** - * Resets journal readers to the given tail. - * - * @param index The index at which to reset readers. - */ - void resetTail(long index) { - for (var reader : readers) { - if (reader.getNextIndex() >= index) { - reader.reset(index); - } - } - } - - void closeReader(SegmentedJournalReader reader) { - readers.remove(reader); - } - - @Override - public boolean isOpen() { - return open; - } - - /** - * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index. - * - * @param index the index from which to remove segments - * @return indicates whether a segment can be removed from the journal - */ - public boolean isCompactable(long index) { - Map.Entry segmentEntry = segments.floorEntry(index); - return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0; - } - - /** - * Returns the index of the last segment in the log. - * - * @param index the compaction index - * @return the starting index of the last segment in the log - */ - public long getCompactableIndex(long index) { - Map.Entry segmentEntry = segments.floorEntry(index); - return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0; - } - - /** - * Compacts the journal up to the given index. - *

    - * The semantics of compaction are not specified by this interface. - * - * @param index The index up to which to compact the journal. - */ - public void compact(long index) { - final var segmentEntry = segments.floorEntry(index); - if (segmentEntry != null) { - final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex()); - if (!compactSegments.isEmpty()) { - LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size()); - compactSegments.values().forEach(JournalSegment::delete); - compactSegments.clear(); - resetHead(segmentEntry.getValue().firstIndex()); - } - } - } - - @Override - public void close() { - segments.values().forEach(JournalSegment::close); - currentSegment = null; - open = false; - } - - /** - * Returns whether {@code flushOnCommit} is enabled for the log. - * - * @return Indicates whether {@code flushOnCommit} is enabled for the log. - */ - boolean isFlushOnCommit() { - return flushOnCommit; - } - - /** - * Commits entries up to the given index. - * - * @param index The index up to which to commit entries. - */ - void setCommitIndex(long index) { - this.commitIndex = index; - } - - /** - * Returns the Raft log commit index. - * - * @return The Raft log commit index. - */ - long getCommitIndex() { - return commitIndex; - } - - /** - * Raft log builder. - */ - public static final class Builder { - private static final boolean DEFAULT_FLUSH_ON_COMMIT = false; - private static final String DEFAULT_NAME = "atomix"; - private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir"); - private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32; - private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024; - private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024; - private static final double DEFAULT_INDEX_DENSITY = .005; - - private String name = DEFAULT_NAME; - private StorageLevel storageLevel = StorageLevel.DISK; - private File directory = new File(DEFAULT_DIRECTORY); - private JournalSerdes namespace; - private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE; - private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE; - private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT; - private double indexDensity = DEFAULT_INDEX_DENSITY; - private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT; - - Builder() { - // Hidden on purpose + @Override + public JournalReader openReader(final long index) { + return openReader(index, JournalReader.Mode.ALL); } /** - * Sets the storage name. + * Opens a new journal reader with the given reader mode. * - * @param name The storage name. - * @return The storage builder. + * @param index The index from which to begin reading entries. + * @param mode The mode in which to read entries. + * @return The journal reader. */ - public Builder withName(String name) { - this.name = requireNonNull(name, "name cannot be null"); - return this; + @Override + public JournalReader openReader(final long index, final JournalReader.Mode mode) { + final var byteReader = switch (mode) { + case ALL -> journal.openReader(index); + case COMMITS -> journal.openCommitsReader(index); + }; + return new SegmentedJournalReader<>(byteReader, mapper); } - /** - * Sets the log storage level, returning the builder for method chaining. - *

    - * The storage level indicates how individual entries should be persisted in the journal. - * - * @param storageLevel The log storage level. - * @return The storage builder. - */ - public Builder withStorageLevel(StorageLevel storageLevel) { - this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); - return this; + @Override + public boolean isOpen() { + return open.get(); } - /** - * Sets the log directory, returning the builder for method chaining. - *

    - * The log will write segment files into the provided directory. - * - * @param directory The log directory. - * @return The storage builder. - * @throws NullPointerException If the {@code directory} is {@code null} - */ - public Builder withDirectory(String directory) { - return withDirectory(new File(requireNonNull(directory, "directory cannot be null"))); + @Override + public void close() { + if (open.compareAndExchange(true, false)) { + journal.close(); + } } /** - * Sets the log directory, returning the builder for method chaining. + * Compacts the journal up to the given index. *

    - * The log will write segment files into the provided directory. + * The semantics of compaction are not specified by this interface. * - * @param directory The log directory. - * @return The storage builder. - * @throws NullPointerException If the {@code directory} is {@code null} + * @param index The index up to which to compact the journal. */ - public Builder withDirectory(File directory) { - this.directory = requireNonNull(directory, "directory cannot be null"); - return this; + public void compact(final long index) { + journal.compact(index); } /** - * Sets the journal namespace, returning the builder for method chaining. + * Returns a new segmented journal builder. * - * @param namespace The journal serializer. - * @return The journal builder. + * @return A new segmented journal builder. */ - public Builder withNamespace(JournalSerdes namespace) { - this.namespace = requireNonNull(namespace, "namespace cannot be null"); - return this; + public static Builder builder() { + return new Builder<>(); } - /** - * Sets the maximum segment size in bytes, returning the builder for method chaining. - *

    - * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment - * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new - * segment and append new entries to that segment. - *

    - * By default, the maximum segment size is {@code 1024 * 1024 * 32}. - * - * @param maxSegmentSize The maximum segment size in bytes. - * @return The storage builder. - * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive - */ - public Builder withMaxSegmentSize(int maxSegmentSize) { - checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, - "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES); - this.maxSegmentSize = maxSegmentSize; - return this; - } + public static final class Builder { + private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder(); + private ByteBufMapper mapper; - /** - * Sets the maximum entry size in bytes, returning the builder for method chaining. - * - * @param maxEntrySize the maximum entry size in bytes - * @return the storage builder - * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive - */ - public Builder withMaxEntrySize(int maxEntrySize) { - checkArgument(maxEntrySize > 0, "maxEntrySize must be positive"); - this.maxEntrySize = maxEntrySize; - return this; - } + private Builder() { + // on purpose + } - /** - * Sets the maximum number of allows entries per segment, returning the builder for method chaining. - *

    - * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment - * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a - * new segment and append new entries to that segment. - *

    - * By default, the maximum entries per segment is {@code 1024 * 1024}. - * - * @param maxEntriesPerSegment The maximum number of entries allowed per segment. - * @return The storage builder. - * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries - * per segment - * @deprecated since 3.0.2 - */ - @Deprecated - public Builder withMaxEntriesPerSegment(int maxEntriesPerSegment) { - checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive"); - checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT, - "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT); - this.maxEntriesPerSegment = maxEntriesPerSegment; - return this; - } + /** + * Sets the journal name. + * + * @param name The journal name. + * @return The journal builder. + */ + public Builder withName(final String name) { + byteJournalBuilder.withName(name); + return this; + } - /** - * Sets the journal index density. - *

    - * The index density is the frequency at which the position of entries written to the journal will be recorded in an - * in-memory index for faster seeking. - * - * @param indexDensity the index density - * @return the journal builder - * @throws IllegalArgumentException if the density is not between 0 and 1 - */ - public Builder withIndexDensity(double indexDensity) { - checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1"); - this.indexDensity = indexDensity; - return this; - } + /** + * Sets the journal storage level. + *

    + * The storage level indicates how individual entries will be persisted in the journal. + * + * @param storageLevel The log storage level. + * @return The journal builder. + */ + public Builder withStorageLevel(final StorageLevel storageLevel) { + byteJournalBuilder.withStorageLevel(storageLevel); + return this; + } - /** - * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method - * chaining. - *

    - * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is - * committed in a given segment. - * - * @return The storage builder. - */ - public Builder withFlushOnCommit() { - return withFlushOnCommit(true); - } + /** + * Sets the journal storage directory. + *

    + * The journal will write segment files into the provided directory. + * + * @param directory The journal storage directory. + * @return The journal builder. + * @throws NullPointerException If the {@code directory} is {@code null} + */ + public Builder withDirectory(final String directory) { + byteJournalBuilder.withDirectory(directory); + return this; + } - /** - * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method - * chaining. - *

    - * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is - * committed in a given segment. - * - * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment. - * @return The storage builder. - */ - public Builder withFlushOnCommit(boolean flushOnCommit) { - this.flushOnCommit = flushOnCommit; - return this; - } + /** + * Sets the journal storage directory. + *

    + * The journal will write segment files into the provided directory. + * + * @param directory The journal storage directory. + * @return The journal builder. + * @throws NullPointerException If the {@code directory} is {@code null} + */ + public Builder withDirectory(final File directory) { + byteJournalBuilder.withDirectory(directory); + return this; + } - /** - * Build the {@link SegmentedJournal}. - * - * @return A new {@link SegmentedJournal}. - */ - public SegmentedJournal build() { - return new SegmentedJournal<>( - name, - storageLevel, - directory, - namespace, - maxSegmentSize, - maxEntrySize, - maxEntriesPerSegment, - indexDensity, - flushOnCommit); + /** + * Sets the journal namespace. + * + * @param namespace The journal serializer. + * @return The journal builder. + * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead + */ + @Deprecated(forRemoval = true, since="9.0.3") + public Builder withNamespace(final JournalSerdes namespace) { + return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper()); + } + + /** + * Sets journal serializer. + * + * @param mapper Journal serializer + * @return The journal builder + */ + public Builder withMapper(final ByteBufMapper mapper) { + this.mapper = requireNonNull(mapper); + return this; + } + + /** + * Sets the maximum segment size in bytes. + *

    + * The maximum segment size dictates when journal should roll over to new segments. As entries are written + * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the + * journal will create a new segment and append new entries to that segment. + *

    + * By default, the maximum segment size is 32M. + * + * @param maxSegmentSize The maximum segment size in bytes. + * @return The storage builder. + * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive + */ + public Builder withMaxSegmentSize(final int maxSegmentSize) { + byteJournalBuilder.withMaxSegmentSize(maxSegmentSize); + return this; + } + + /** + * Sets the maximum entry size in bytes. + * + * @param maxEntrySize the maximum entry size in bytes + * @return the storage builder + * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive + */ + public Builder withMaxEntrySize(final int maxEntrySize) { + byteJournalBuilder.withMaxEntrySize(maxEntrySize); + return this; + } + + /** + * Sets the maximum number of entries per segment. + * + * @param maxEntriesPerSegment The maximum number of entries allowed per segment. + * @return The journal builder. + * @deprecated since 3.0.2, no longer used + */ + @Deprecated + public Builder withMaxEntriesPerSegment(final int maxEntriesPerSegment) { + // ignore + return this; + } + + /** + * Sets the journal index density. + *

    + * The index density is the frequency at which the position of entries written to the journal will be recorded + * in an in-memory index for faster seeking. + * + * @param indexDensity the index density + * @return the journal builder + * @throws IllegalArgumentException if the density is not between 0 and 1 + */ + public Builder withIndexDensity(final double indexDensity) { + byteJournalBuilder.withIndexDensity(indexDensity); + return this; + } + + /** + * Enables flushing buffers to disk when entries are committed to a segment. + *

    + * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an + * entry is committed in a given segment. + * + * @return The journal builder. + */ + public Builder withFlushOnCommit() { + return withFlushOnCommit(true); + } + + /** + * Enables flushing buffers to disk when entries are committed to a segment. + *

    + * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an + * entry is committed in a given segment. + * + * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment. + * @return The journal builder. + */ + public Builder withFlushOnCommit(final boolean flushOnCommit) { + byteJournalBuilder.withFlushOnCommit(flushOnCommit); + return this; + } + + /** + * Build the {@link SegmentedJournal}. + * + * @return {@link SegmentedJournal} instance. + */ + public SegmentedJournal build() { + return new SegmentedJournal<>(byteJournalBuilder.build(), mapper); + } } - } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java index a5deb6382e..f28390c84b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java @@ -1,6 +1,6 @@ /* * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. - * Copyright (c) 2024 PANTHEON.tech, s.r.o. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,134 +18,49 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; -import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; /** - * A {@link JournalReader} traversing all entries. + * A {@link JournalReader} backed by a {@link ByteBufReader}. */ -sealed class SegmentedJournalReader implements JournalReader permits CommitsSegmentJournalReader { - // Marker non-null object for tryAdvance() - private static final @NonNull Object ADVANCED = new Object(); +final class SegmentedJournalReader implements JournalReader { + private final ByteBufMapper mapper; + private final ByteBufReader reader; - final SegmentedJournal journal; - - private JournalSegment currentSegment; - private JournalSegmentReader currentReader; - private long nextIndex; - - SegmentedJournalReader(final SegmentedJournal journal, final JournalSegment segment) { - this.journal = requireNonNull(journal); - currentSegment = requireNonNull(segment); - currentReader = segment.createReader(); - nextIndex = currentSegment.firstIndex(); + SegmentedJournalReader(final ByteBufReader reader, final ByteBufMapper mapper) { + this.reader = requireNonNull(reader); + this.mapper = requireNonNull(mapper); } @Override - public final long getFirstIndex() { - return journal.getFirstSegment().firstIndex(); + public long getFirstIndex() { + return reader.firstIndex(); } @Override - public final long getNextIndex() { - return nextIndex; + public long getNextIndex() { + return reader.nextIndex(); } @Override - public final void reset() { - currentReader.close(); - - currentSegment = journal.getFirstSegment(); - currentReader = currentSegment.createReader(); - nextIndex = currentSegment.firstIndex(); + public void reset() { + reader.reset(); } @Override - public final void reset(final long index) { - // If the current segment is not open, it has been replaced. Reset the segments. - if (!currentSegment.isOpen()) { - reset(); - } - - if (index < nextIndex) { - rewind(index); - } else if (index > nextIndex) { - while (index > nextIndex && tryAdvance()) { - // Nothing else - } - } else { - resetCurrentReader(index); - } - } - - private void resetCurrentReader(final long index) { - final var position = currentSegment.lookup(index - 1); - if (position != null) { - nextIndex = position.index(); - currentReader.setPosition(position.position()); - } else { - nextIndex = currentSegment.firstIndex(); - currentReader.setPosition(JournalSegmentDescriptor.BYTES); - } - while (nextIndex < index && tryAdvance()) { - // Nothing else - } - } - - /** - * Rewinds the journal to the given index. - */ - private void rewind(final long index) { - if (currentSegment.firstIndex() >= index) { - JournalSegment segment = journal.getSegment(index - 1); - if (segment != null) { - currentReader.close(); - - currentSegment = segment; - currentReader = currentSegment.createReader(); - } - } - - resetCurrentReader(index); + public void reset(final long index) { + reader.reset(index); } @Override - public T tryNext(final EntryMapper mapper) { - final var index = nextIndex; - var buf = currentReader.readBytes(index); - if (buf == null) { - final var nextSegment = journal.getNextSegment(currentSegment.firstIndex()); - if (nextSegment == null || nextSegment.firstIndex() != index) { - return null; - } - - currentReader.close(); - - currentSegment = nextSegment; - currentReader = currentSegment.createReader(); - buf = currentReader.readBytes(index); - if (buf == null) { - return null; - } - } - - final var entry = journal.serializer().deserialize(buf); - final var ret = requireNonNull(mapper.mapEntry(index, entry, buf.readableBytes())); - nextIndex = index + 1; - return ret; - } - - /** - * Try to move to the next entry. - * - * @return {@code true} if there was a next entry and this reader has moved to it - */ - final boolean tryAdvance() { - return tryNext((index, entry, size) -> ADVANCED) != null; + public @Nullable T tryNext(final EntryMapper entryMapper) { + return reader.tryNext( + (index, buf) -> requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(buf), buf.readableBytes())) + ); } @Override - public final void close() { - currentReader.close(); - journal.closeReader(this); + public void close() { + reader.close(); } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java index 71120891a1..7c331ccb24 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java @@ -1,5 +1,6 @@ /* * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,94 +16,53 @@ */ package io.atomix.storage.journal; -import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; /** - * Raft log writer. + * A {@link JournalWriter} backed by a {@link ByteBufWriter}. */ final class SegmentedJournalWriter implements JournalWriter { - private final SegmentedJournal journal; - private JournalSegment currentSegment; - private JournalSegmentWriter currentWriter; + private final ByteBufMapper mapper; + private final ByteBufWriter writer; - SegmentedJournalWriter(SegmentedJournal journal) { - this.journal = journal; - this.currentSegment = journal.getLastSegment(); - this.currentWriter = currentSegment.acquireWriter(); - } - - @Override - public long getLastIndex() { - return currentWriter.getLastIndex(); - } - - @Override - public long getNextIndex() { - return currentWriter.getNextIndex(); - } - - @Override - public void reset(long index) { - if (index > currentSegment.firstIndex()) { - currentSegment.releaseWriter(); - currentSegment = journal.resetSegments(index); - currentWriter = currentSegment.acquireWriter(); - } else { - truncate(index - 1); + SegmentedJournalWriter(final ByteBufWriter writer, final ByteBufMapper mapper) { + this.writer = requireNonNull(writer); + this.mapper = requireNonNull(mapper); } - journal.resetHead(index); - } - @Override - public void commit(long index) { - if (index > journal.getCommitIndex()) { - journal.setCommitIndex(index); - if (journal.isFlushOnCommit()) { - flush(); - } + @Override + public long getLastIndex() { + return writer.lastIndex(); } - } - @Override - public Indexed append(T entry) { - final var bytes = journal.serializer().serialize(entry); - var index = currentWriter.append(bytes); - if (index != null) { - return new Indexed<>(index, entry, bytes.readableBytes()); + @Override + public long getNextIndex() { + return writer.nextIndex(); } - // Slow path: we do not have enough capacity - currentWriter.flush(); - currentSegment.releaseWriter(); - currentSegment = journal.getNextSegment(); - currentWriter = currentSegment.acquireWriter(); - final var newIndex = verifyNotNull(currentWriter.append(bytes)); - return new Indexed<>(newIndex, entry, bytes.readableBytes()); - } - - @Override - public void truncate(long index) { - if (index < journal.getCommitIndex()) { - throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index); + @Override + public void reset(final long index) { + writer.reset(index); } - // Delete all segments with first indexes greater than the given index. - while (index < currentSegment.firstIndex() && currentSegment != journal.getFirstSegment()) { - currentSegment.releaseWriter(); - journal.removeSegment(currentSegment); - currentSegment = journal.getLastSegment(); - currentWriter = currentSegment.acquireWriter(); + @Override + public void commit(final long index) { + writer.commit(index); } - // Truncate the current index. - currentWriter.truncate(index); + @Override + public Indexed append(final T entry) { + final var buf = mapper.objectToBytes(entry); + return new Indexed<>(writer.append(buf), entry, buf.readableBytes()); + } - // Reset segment readers. - journal.resetTail(index + 1); - } + @Override + public void truncate(final long index) { + writer.truncate(index); + } - @Override - public void flush() { - currentWriter.flush(); - } + @Override + public void flush() { + writer.flush(); + } } -- 2.36.6 From 2cdb3460a95fae6dd556ca9e0e20c042ec63ed22 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 20:59:16 +0200 Subject: [PATCH 10/16] Refactor Journal interface There are two basic issues: - isOpen() is completely unused - close() is inherited from Closeable, not AutoCloseable Address these and move implementation of openReader(long), so it is canonical. JIRA: CONTROLLER-2100 Change-Id: I1468ed5a3e9ee1abefe35a4bfaf653696763907f Signed-off-by: Robert Varga --- .../io/atomix/storage/journal/Journal.java | 60 +++++++++---------- .../storage/journal/SegmentedJournal.java | 11 +--- 2 files changed, 28 insertions(+), 43 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java index 5e37c12222..93ae0a565b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java @@ -15,46 +15,40 @@ */ package io.atomix.storage.journal; -import java.io.Closeable; +import io.atomix.storage.journal.JournalReader.Mode; /** * Journal. * * @author Jordan Halterman */ -public interface Journal extends Closeable { +public interface Journal extends AutoCloseable { + /** + * Returns the journal writer. + * + * @return The journal writer. + */ + JournalWriter writer(); - /** - * Returns the journal writer. - * - * @return The journal writer. - */ - JournalWriter writer(); + /** + * Opens a new journal reader with {@link Mode#ALL}. + * + * @param index The index at which to start the reader. + * @return A new journal reader. + */ + default JournalReader openReader(final long index) { + return openReader(index, Mode.ALL); + } - /** - * Opens a new journal reader. - * - * @param index The index at which to start the reader. - * @return A new journal reader. - */ - JournalReader openReader(long index); + /** + * Opens a new journal reader with specified mode. + * + * @param index The index at which to start the reader. + * @param mode the reader mode + * @return A new journal reader. + */ + JournalReader openReader(long index, Mode mode); - /** - * Opens a new journal reader. - * - * @param index The index at which to start the reader. - * @param mode the reader mode - * @return A new journal reader. - */ - JournalReader openReader(long index, JournalReader.Mode mode); - - /** - * Returns a boolean indicating whether the journal is open. - * - * @return Indicates whether the journal is open. - */ - boolean isOpen(); - - @Override - void close(); + @Override + void close(); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index cd07492692..8f0464ebe3 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -19,13 +19,11 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; import java.io.File; -import java.util.concurrent.atomic.AtomicBoolean; /** * Segmented journal. */ public final class SegmentedJournal implements Journal { - private final AtomicBoolean open = new AtomicBoolean(true); private final SegmentedByteBufJournal journal; private final SegmentedJournalWriter writer; private final ByteBufMapper mapper; @@ -62,16 +60,9 @@ public final class SegmentedJournal implements Journal { return new SegmentedJournalReader<>(byteReader, mapper); } - @Override - public boolean isOpen() { - return open.get(); - } - @Override public void close() { - if (open.compareAndExchange(true, false)) { - journal.close(); - } + journal.close(); } /** -- 2.36.6 From 9f1702771cc777690b18ecffd0e7059dcf574475 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 May 2024 23:02:42 +0200 Subject: [PATCH 11/16] Use getCompactableIndex() to unmask firstIndex We have duplicated code which can easily use the result of getCompactableIndex() and work on top of that. JIRA: CONTROLLER-2100 Change-Id: Ib580853424d445d82c448a86d82a706f4bba50d2 Signed-off-by: Robert Varga --- .../storage/journal/SegmentedByteBufJournal.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java index 3ae64ea82e..c3e4b2bad1 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -370,8 +370,8 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { * @return indicates whether a segment can be removed from the journal */ public boolean isCompactable(final long index) { - final var segmentEntry = segments.floorEntry(index); - return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0; + final var firstIndex = getCompactableIndex(index); + return firstIndex != 0 && !segments.headMap(firstIndex).isEmpty(); } /** @@ -393,14 +393,14 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { * @param index The index up to which to compact the journal. */ public void compact(final long index) { - final var segmentEntry = segments.floorEntry(index); - if (segmentEntry != null) { - final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex()); + final var firstIndex = getCompactableIndex(index); + if (firstIndex != 0) { + final var compactSegments = segments.headMap(firstIndex); if (!compactSegments.isEmpty()) { LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size()); compactSegments.values().forEach(JournalSegment::delete); compactSegments.clear(); - resetHead(segmentEntry.getValue().firstIndex()); + resetHead(firstIndex); } } } -- 2.36.6 From 204c9c2f7dff16dc85b2a211c88347aeaa412591 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 May 2024 23:24:33 +0200 Subject: [PATCH 12/16] Do not call nextSegment() from {first,last}Segment() We always have at last one segment and nextSegment() is the slow path here (requiring synchronization). Just assume {first,last}Entry() returns non-null. This assumption is violated while we are removing a segment, as we could be removing the last segment. In that case we need to re-create it. Also improve method names and documentation a bit. JIRA: CONTROLLER-2115 Change-Id: I74bb1578e73828666ee795522a68f14ad112ec75 Signed-off-by: Robert Varga --- .../journal/SegmentedByteBufJournal.java | 118 +++++++++--------- .../journal/SegmentedByteBufReader.java | 2 +- .../journal/SegmentedByteBufWriter.java | 2 +- 3 files changed, 61 insertions(+), 61 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java index c3e4b2bad1..dc837eaabe 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.BiFunction; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { private final boolean flushOnCommit; private final @NonNull ByteBufWriter writer; + // null when closed private JournalSegment currentSegment; private volatile long commitIndex; @@ -63,7 +65,13 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { this.maxEntrySize = maxEntrySize; this.indexDensity = indexDensity; this.flushOnCommit = flushOnCommit; - open(); + + // Load existing log segments from disk. + for (var segment : loadSegments()) { + segments.put(segment.firstIndex(), segment); + } + currentSegment = ensureLastSegment(); + writer = new SegmentedByteBufWriter(this); } @@ -108,23 +116,6 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { return openReader(index, SegmentedCommitsByteBufReader::new); } - /** - * Opens the segments. - */ - private synchronized void open() { - // Load existing log segments from disk. - for (var segment : loadSegments()) { - segments.put(segment.firstIndex(), segment); - } - // If a segment doesn't already exist, create an initial segment starting at index 1. - if (segments.isEmpty()) { - currentSegment = createSegment(1, 1); - segments.put(1L, currentSegment); - } else { - currentSegment = segments.lastEntry().getValue(); - } - } - /** * Asserts that the manager is open. * @@ -143,19 +134,6 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { } } - /** - * Resets the current segment, creating a new segment if necessary. - */ - private synchronized void resetCurrentSegment() { - final var lastSegment = lastSegment(); - if (lastSegment == null) { - currentSegment = createSegment(1, 1); - segments.put(1L, currentSegment); - } else { - currentSegment = lastSegment; - } - } - /** * Resets and returns the first segment in the journal. * @@ -173,10 +151,9 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { segments.values().forEach(JournalSegment::delete); segments.clear(); - - currentSegment = createSegment(1, index); - segments.put(index, currentSegment); - return currentSegment; + final var newSegment = createInitialSegment(); + currentSegment = newSegment; + return newSegment; } /** @@ -186,8 +163,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { */ JournalSegment firstSegment() { assertOpen(); - final var firstEntry = segments.firstEntry(); - return firstEntry != null ? firstEntry.getValue() : nextSegment(); + return segments.firstEntry().getValue(); } /** @@ -197,8 +173,18 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { */ JournalSegment lastSegment() { assertOpen(); - final var lastEntry = segments.lastEntry(); - return lastEntry != null ? lastEntry.getValue() : nextSegment(); + return segments.lastEntry().getValue(); + } + + /** + * Returns the segment following the segment with the given ID. + * + * @param index The segment index with which to look up the next segment. + * @return The next segment for the given index, or {@code null} if no such segment exists + */ + @Nullable JournalSegment tryNextSegment(final long index) { + final var higherEntry = segments.higherEntry(index); + return higherEntry != null ? higherEntry.getValue() : null; } /** @@ -207,26 +193,17 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { * @return The next segment. * @throws IllegalStateException if the segment manager is not open */ - synchronized JournalSegment nextSegment() { + synchronized @NonNull JournalSegment createNextSegment() { assertOpen(); assertDiskSpace(); + // FIXME: lastSegment should equal currentSegment. We should be asserting that. final var index = currentSegment.lastIndex() + 1; final var lastSegment = lastSegment(); - currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index); - segments.put(index, currentSegment); - return currentSegment; - } - - /** - * Returns the segment following the segment with the given ID. - * - * @param index The segment index with which to look up the next segment. - * @return The next segment for the given index. - */ - JournalSegment nextSegment(final long index) { - final var higherEntry = segments.higherEntry(index); - return higherEntry != null ? higherEntry.getValue() : null; + final var nextSegment = createSegment(lastSegment.file().segmentId() + 1, index); + segments.put(index, nextSegment); + currentSegment = nextSegment; + return nextSegment; } /** @@ -255,18 +232,24 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { synchronized void removeSegment(final JournalSegment segment) { segments.remove(segment.firstIndex()); segment.delete(); - resetCurrentSegment(); + + // Reset current segment to last segment + currentSegment = ensureLastSegment(); } /** * Creates a new segment. + * + * @param segmentId the segment ID + * @param firstIndex index of first entry + * @param A new segment */ - JournalSegment createSegment(final long id, final long index) { + private @NonNull JournalSegment createSegment(final long segmentId, final long firstIndex) { final JournalSegmentFile file; try { file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder() - .withId(id) - .withIndex(index) + .withId(segmentId) + .withIndex(firstIndex) .withMaxSegmentSize(maxSegmentSize) // FIXME: propagate maxEntries .withMaxEntries(Integer.MAX_VALUE) @@ -281,12 +264,29 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { return segment; } + private @NonNull JournalSegment createInitialSegment() { + final var segment = createSegment(1, 1); + segments.put(1L, segment); + return segment; + } + + /** + * Make sure there is a last segment and return it. + * + * @return the last segment + */ + private JournalSegment ensureLastSegment() { + final var lastEntry = segments.lastEntry(); + // if there is no segment, create an initial segment starting at index 1. + return lastEntry != null ? lastEntry.getValue() : createInitialSegment(); + } + /** * Loads all segments from disk. * * @return A collection of segments for the log. */ - protected Collection loadSegments() { + private Collection loadSegments() { // Ensure log directories are created. directory.mkdirs(); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java index d164676845..d7eb68e847 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java @@ -125,7 +125,7 @@ sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCo ByteBuf tryAdvance(final long index) { var buf = currentReader.readBytes(); if (buf == null) { - final var nextSegment = journal.nextSegment(currentSegment.firstIndex()); + final var nextSegment = journal.tryNextSegment(currentSegment.firstIndex()); if (nextSegment == null || nextSegment.firstIndex() != index) { return null; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java index 7e92815f34..0d942dd085 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -77,7 +77,7 @@ final class SegmentedByteBufWriter implements ByteBufWriter { // Slow path: we do not have enough capacity currentWriter.flush(); currentSegment.releaseWriter(); - currentSegment = journal.nextSegment(); + currentSegment = journal.createNextSegment(); currentWriter = currentSegment.acquireWriter(); return verifyNotNull(currentWriter.append(buf)); } -- 2.36.6 From c60f40bb075bee81e6a3977a620a1b7b4e0550cd Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 8 May 2024 00:49:53 +0200 Subject: [PATCH 13/16] Fix segment population There has been a mixup in previous patch, which ends up touching the segments from loadSegments(). Fix that up. JIRA: CONTROLLER-2115 Change-Id: I7932e691c9751a79ab2b407f9de4049721680c6d Signed-off-by: Robert Varga --- .../java/io/atomix/storage/journal/SegmentedByteBufJournal.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java index dc837eaabe..074a5dd182 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -309,7 +309,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { // Add the segment to the segments list. final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity); - segments.put(segment.firstIndex(), segment); + segmentsMap.put(segment.firstIndex(), segment); } } -- 2.36.6 From 7997055cc3a82e0bfe753a4e2dbcd8af59d9113d Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 7 May 2024 15:11:23 +0200 Subject: [PATCH 14/16] Refactor SegmentedJournalWriter.reset() We have two methods reset() and truncate(), both of which take an index, without a real documented distinction. Document reset() as taking the index to read/write next and instantiate a guard against attempts to use reset(0) -- as 0 is not a valid next index. JIRA: CONTROLLER-2100 Change-Id: I8a6b366fdb0827ab3cd5a494e7e9f5a741983264 Signed-off-by: Robert Varga --- .../atomix/storage/journal/ByteBufReader.java | 2 +- .../atomix/storage/journal/ByteBufWriter.java | 5 +-- .../atomix/storage/journal/JournalReader.java | 2 +- .../atomix/storage/journal/JournalWriter.java | 4 ++- .../journal/SegmentedByteBufWriter.java | 33 ++++++++++++------- .../journal/SegmentedJournalWriter.java | 10 +++--- 6 files changed, 34 insertions(+), 22 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java index 1ebe81eef6..205857533d 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java @@ -71,7 +71,7 @@ public interface ByteBufReader extends AutoCloseable { /** * Resets the reader to the given index. * - * @param index The index to which to reset the reader + * @param index the next index to read */ void reset(long index); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java index 7211a8844d..2f85cc6163 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java @@ -56,9 +56,9 @@ public interface ByteBufWriter { /** * Resets the head of the journal to the given index. * - * @param index The index to which to reset the head of the journal + * @param index the next index to write + * @throws IndexOutOfBoundsException if the journal cannot be reset to specified index */ - // FIXME: reconcile with reader's reset and truncate() // FIXME: throws IOException void reset(long index); @@ -66,6 +66,7 @@ public interface ByteBufWriter { * Truncates the log to the given index. * * @param index The index to which to truncate the log. + * @throws IndexOutOfBoundsException if the journal cannot be reset to specified index */ // FIXME: reconcile with reset() // FIXME: throws IOException diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java index 635f6248c4..8f49aa2ebc 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java @@ -88,7 +88,7 @@ public interface JournalReader extends AutoCloseable { /** * Resets the reader to the given index. * - * @param index The index to which to reset the reader. + * @param index the next index to read */ void reset(long index); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java index ba7c5821aa..ae65778185 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java @@ -55,7 +55,8 @@ public interface JournalWriter { /** * Resets the head of the journal to the given index. * - * @param index the index to which to reset the head of the journal + * @param index the next index to write + * @throws IndexOutOfBoundsException if the journal cannot be reset to specified index */ // FIXME: reconcile with reader's reset and truncate() void reset(long index); @@ -64,6 +65,7 @@ public interface JournalWriter { * Truncates the log to the given index. * * @param index The index to which to truncate the log. + * @throws IndexOutOfBoundsException if the journal cannot be reset to specified index */ // FIXME: reconcile with reset() void truncate(long index); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java index 0d942dd085..42c00e59fd 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -46,18 +46,6 @@ final class SegmentedByteBufWriter implements ByteBufWriter { return currentWriter.getNextIndex(); } - @Override - public void reset(final long index) { - if (index > currentSegment.firstIndex()) { - currentSegment.releaseWriter(); - currentSegment = journal.resetSegments(index); - currentWriter = currentSegment.acquireWriter(); - } else { - truncate(index - 1); - } - journal.resetHead(index); - } - @Override public void commit(final long index) { if (index > journal.getCommitIndex()) { @@ -82,12 +70,33 @@ final class SegmentedByteBufWriter implements ByteBufWriter { return verifyNotNull(currentWriter.append(buf)); } + @Override + public void reset(final long index) { + final long commitIndex = journal.getCommitIndex(); + if (index <= commitIndex) { + // also catches index == 0, which is not a valid next index + throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", committed index: " + commitIndex); + } + + if (index > currentSegment.firstIndex()) { + currentSegment.releaseWriter(); + currentSegment = journal.resetSegments(index); + currentWriter = currentSegment.acquireWriter(); + } else { + checkedTruncate(index - 1); + } + journal.resetHead(index); + } + @Override public void truncate(final long index) { if (index < journal.getCommitIndex()) { throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index); } + checkedTruncate(index); + } + private void checkedTruncate(final long index) { // Delete all segments with first indexes greater than the given index. while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) { currentSegment.releaseWriter(); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java index 7c331ccb24..80c352ead0 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java @@ -40,11 +40,6 @@ final class SegmentedJournalWriter implements JournalWriter { return writer.nextIndex(); } - @Override - public void reset(final long index) { - writer.reset(index); - } - @Override public void commit(final long index) { writer.commit(index); @@ -56,6 +51,11 @@ final class SegmentedJournalWriter implements JournalWriter { return new Indexed<>(writer.append(buf), entry, buf.readableBytes()); } + @Override + public void reset(final long index) { + writer.reset(index); + } + @Override public void truncate(final long index) { writer.truncate(index); -- 2.36.6 From 08f3407005c878653d35d33bb28aca039ec60b0e Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 07:16:46 +0200 Subject: [PATCH 15/16] Maintain last known position in JournalIndex A JournalIndex is bound to see all entries that get written into a file, hence it can easily maintain that Position. This simplifies JournalSegmentWriter state maintenance, as we can consult the index for JournalSegmentWriter.get{Last,Next}Index(). Also expand test suite to make explicit assertions on returns. JIRA: CONTROLLER-2100 Change-Id: I57731f53cacfdfd548ab59c06706d41a013e7908 Signed-off-by: Robert Varga --- .../io/atomix/storage/journal/Indexed.java | 5 + .../storage/journal/JournalSegmentWriter.java | 23 ++-- .../journal/SegmentedByteBufWriter.java | 16 +-- .../storage/journal/index/JournalIndex.java | 11 +- .../journal/index/SparseJournalIndex.java | 28 ++++- .../journal/index/SparseJournalIndexTest.java | 113 ++++++++++-------- 6 files changed, 117 insertions(+), 79 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/Indexed.java b/atomix-storage/src/main/java/io/atomix/storage/journal/Indexed.java index 5bf7e6f454..38b51d9d3c 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/Indexed.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/Indexed.java @@ -19,6 +19,7 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; +import io.atomix.storage.journal.index.Position; import org.eclipse.jdt.annotation.NonNullByDefault; /** @@ -37,6 +38,10 @@ public record Indexed(long index, E entry, int size) { requireNonNull(entry); } + Indexed(final Position position, final E entry, final int size) { + this(position.index(), entry, size); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("index", index).add("entry", entry).toString(); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index dbf6aec214..b18371f044 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -19,6 +19,7 @@ import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; import static java.util.Objects.requireNonNull; import io.atomix.storage.journal.index.JournalIndex; +import io.atomix.storage.journal.index.Position; import io.netty.buffer.ByteBuf; import java.nio.MappedByteBuffer; import org.eclipse.jdt.annotation.NonNull; @@ -36,7 +37,6 @@ final class JournalSegmentWriter { final int maxEntrySize; private int currentPosition; - private Long lastIndex; JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize, final JournalIndex index) { @@ -54,7 +54,6 @@ final class JournalSegmentWriter { index = previous.index; maxSegmentSize = previous.maxSegmentSize; maxEntrySize = previous.maxEntrySize; - lastIndex = previous.lastIndex; currentPosition = previous.currentPosition; this.fileWriter = requireNonNull(fileWriter); } @@ -65,7 +64,8 @@ final class JournalSegmentWriter { * @return The last written index. */ long getLastIndex() { - return lastIndex != null ? lastIndex : segment.firstIndex() - 1; + final var last = index.last(); + return last != null ? last.index() : segment.firstIndex() - 1; } /** @@ -74,7 +74,8 @@ final class JournalSegmentWriter { * @return The next index to be written. */ long getNextIndex() { - return lastIndex != null ? lastIndex + 1 : segment.firstIndex(); + final var last = index.last(); + return last != null ? last.index() + 1 : segment.firstIndex(); } /** @@ -83,7 +84,7 @@ final class JournalSegmentWriter { * @param buf binary data to append * @return The index of appended data, or {@code null} if segment has no space */ - Long append(final ByteBuf buf) { + Position append(final ByteBuf buf) { final var length = buf.readableBytes(); if (length > maxEntrySize) { throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes (" @@ -113,10 +114,7 @@ final class JournalSegmentWriter { // Update the last entry with the correct index/term/length. currentPosition = nextPosition; - lastIndex = index; - this.index.index(index, position); - - return index; + return this.index.index(index, position); } /** @@ -149,9 +147,7 @@ final class JournalSegmentWriter { break; } - lastIndex = nextIndex; - this.index.index(nextIndex, currentPosition); - nextIndex++; + this.index.index(nextIndex++, currentPosition); // Update the current position for indexing. currentPosition += HEADER_BYTES + buf.readableBytes(); @@ -169,9 +165,6 @@ final class JournalSegmentWriter { return; } - // Reset the last written - lastIndex = null; - // Truncate the index. this.index.truncate(index); diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java index 42c00e59fd..e16e6446eb 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -16,7 +16,6 @@ */ package io.atomix.storage.journal; -import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; import io.netty.buffer.ByteBuf; @@ -57,17 +56,18 @@ final class SegmentedByteBufWriter implements ByteBufWriter { } @Override - public long append(final ByteBuf buf) { - var index = currentWriter.append(buf); - if (index != null) { - return index; - } - // Slow path: we do not have enough capacity + public long append(final ByteBuf bytes) { + final var position = currentWriter.append(bytes); + return position != null ? position.index() : appendToNextSegment(bytes); + } + + // Slow path: we do not have enough capacity + private long appendToNextSegment(final ByteBuf bytes) { currentWriter.flush(); currentSegment.releaseWriter(); currentSegment = journal.createNextSegment(); currentWriter = currentSegment.acquireWriter(); - return verifyNotNull(currentWriter.append(buf)); + return currentWriter.append(bytes).index(); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/index/JournalIndex.java b/atomix-storage/src/main/java/io/atomix/storage/journal/index/JournalIndex.java index 8608e00fc6..3c63141246 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/index/JournalIndex.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/index/JournalIndex.java @@ -16,6 +16,7 @@ */ package io.atomix.storage.journal.index; +import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; /** @@ -27,8 +28,16 @@ public interface JournalIndex { * * @param index the index for which to add the entry * @param position the position of the given index + * @return A {@link Position} */ - void index(long index, int position); + @NonNull Position index(long index, int position); + + /** + * Return the last position known to this index. + * + * @return the last position known to this index + */ + @Nullable Position last(); /** * Looks up the position of the given index. diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/index/SparseJournalIndex.java b/atomix-storage/src/main/java/io/atomix/storage/journal/index/SparseJournalIndex.java index 6da3ba4ed4..b1c5e2ba1b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/index/SparseJournalIndex.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/index/SparseJournalIndex.java @@ -18,6 +18,7 @@ package io.atomix.storage.journal.index; import com.google.common.base.MoreObjects; import java.util.TreeMap; +import org.eclipse.jdt.annotation.Nullable; /** * A {@link JournalIndex} maintaining target density. @@ -25,8 +26,11 @@ import java.util.TreeMap; public final class SparseJournalIndex implements JournalIndex { private static final int MIN_DENSITY = 1000; - private final int density; private final TreeMap positions = new TreeMap<>(); + private final int density; + + // Last known position. May not be accurate immediately after a truncate() or construction + private @Nullable Position last; public SparseJournalIndex() { density = MIN_DENSITY; @@ -37,10 +41,18 @@ public final class SparseJournalIndex implements JournalIndex { } @Override - public void index(final long index, final int position) { + public Position index(final long index, final int position) { + final var newLast = new Position(index, position); + last = newLast; if (index % density == 0) { positions.put(index, position); } + return newLast; + } + + @Override + public Position last() { + return last; } @Override @@ -50,8 +62,16 @@ public final class SparseJournalIndex implements JournalIndex { @Override public Position truncate(final long index) { - positions.tailMap(index, false).clear(); - return Position.ofNullable(positions.lastEntry()); + // Clear all indexes unto and including index, saving the first removed entry + final var tailMap = positions.tailMap(index, true); + final var firstRemoved = tailMap.firstEntry(); + tailMap.clear(); + + // Update last position to the last entry, but make sure to return a pointer to index if that is what we have + // indexed. + final var newLast = Position.ofNullable(positions.lastEntry()); + last = newLast; + return firstRemoved != null && firstRemoved.getKey() == index ? new Position(firstRemoved) : newLast; } @Override diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/index/SparseJournalIndexTest.java b/atomix-storage/src/test/java/io/atomix/storage/journal/index/SparseJournalIndexTest.java index b7cd38a1a4..204fdb86a3 100644 --- a/atomix-storage/src/test/java/io/atomix/storage/journal/index/SparseJournalIndexTest.java +++ b/atomix-storage/src/test/java/io/atomix/storage/journal/index/SparseJournalIndexTest.java @@ -15,61 +15,72 @@ */ package io.atomix.storage.journal.index; -import org.junit.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import org.junit.jupiter.api.Test; /** * Sparse journal index test. */ -public class SparseJournalIndexTest { - @Test - public void testSparseJournalIndex() throws Exception { - JournalIndex index = new SparseJournalIndex(.2); - assertNull(index.lookup(1)); - index.index(1, 2); - assertNull(index.lookup(1)); - index.index(2, 4); - index.index(3, 6); - index.index(4, 8); - index.index(5, 10); - assertEquals(new Position(5, 10), index.lookup(5)); - index.index(6, 12); - index.index(7, 14); - index.index(8, 16); - assertEquals(new Position(5, 10), index.lookup(8)); - index.index(9, 18); - index.index(10, 20); - assertEquals(new Position(10, 20), index.lookup(10)); - index.truncate(8); - assertEquals(new Position(5, 10), index.lookup(8)); - assertEquals(new Position(5, 10), index.lookup(10)); - index.truncate(4); - assertNull(index.lookup(4)); - assertNull(index.lookup(8)); +class SparseJournalIndexTest { + private final SparseJournalIndex sparseIndex = new SparseJournalIndex(.2); - index = new SparseJournalIndex(.2); - assertNull(index.lookup(100)); - index.index(101, 2); - assertNull(index.lookup(1)); - index.index(102, 4); - index.index(103, 6); - index.index(104, 8); - index.index(105, 10); - assertEquals(new Position(105, 10), index.lookup(105)); - index.index(106, 12); - index.index(107, 14); - index.index(108, 16); - assertEquals(new Position(105, 10), index.lookup(108)); - index.index(109, 18); - index.index(110, 20); - assertEquals(new Position(110, 20), index.lookup(110)); - index.truncate(108); - assertEquals(new Position(105, 10), index.lookup(108)); - assertEquals(new Position(105, 10), index.lookup(110)); - index.truncate(104); - assertNull(index.lookup(104)); - assertNull(index.lookup(108)); - } + @Test + void firstTest() throws Exception { + assertNull(sparseIndex.lookup(1)); + assertIndex(1, 2); + assertNull(sparseIndex.lookup(1)); + assertIndex(2, 4); + assertIndex(3, 6); + assertIndex(4, 8); + assertIndex(5, 10); + assertEquals(new Position(5, 10), sparseIndex.lookup(5)); + assertIndex(6, 12); + assertIndex(7, 14); + assertIndex(8, 16); + assertEquals(new Position(5, 10), sparseIndex.lookup(8)); + assertIndex(9, 18); + assertIndex(10, 20); + assertEquals(new Position(10, 20), sparseIndex.lookup(10)); + assertEquals(new Position(5, 10), sparseIndex.truncate(8)); + assertEquals(new Position(5, 10), sparseIndex.lookup(5)); + assertEquals(new Position(5, 10), sparseIndex.lookup(8)); + assertEquals(new Position(5, 10), sparseIndex.lookup(10)); + assertEquals(new Position(5, 10), sparseIndex.truncate(5)); + assertNull(sparseIndex.lookup(5)); + assertNull(sparseIndex.lookup(8)); + assertNull(sparseIndex.truncate(4)); + assertNull(sparseIndex.lookup(4)); + assertNull(sparseIndex.lookup(8)); + } + + @Test + void secondTest() { + assertNull(sparseIndex.lookup(100)); + assertIndex(101, 2); + assertNull(sparseIndex.lookup(1)); + assertIndex(102, 4); + assertIndex(103, 6); + assertIndex(104, 8); + assertIndex(105, 10); + assertEquals(new Position(105, 10), sparseIndex.lookup(105)); + assertIndex(106, 12); + assertIndex(107, 14); + assertIndex(108, 16); + assertEquals(new Position(105, 10), sparseIndex.lookup(108)); + assertIndex(109, 18); + assertIndex(110, 20); + assertEquals(new Position(110, 20), sparseIndex.lookup(110)); + assertEquals(new Position(105, 10), sparseIndex.truncate(108)); + assertEquals(new Position(105, 10), sparseIndex.lookup(108)); + assertEquals(new Position(105, 10), sparseIndex.lookup(110)); + assertNull(sparseIndex.truncate(104)); + assertNull(sparseIndex.lookup(104)); + assertNull(sparseIndex.lookup(108)); + } + + private void assertIndex(final long index, final int position) { + assertEquals(new Position(index, position), sparseIndex.index(index, position)); + } } -- 2.36.6 From 794f28ea9f2c22dfb7042266b71ada659a920ab7 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 6 May 2024 21:51:31 +0200 Subject: [PATCH 16/16] Move JournalWriter.getLastIndex() Last written index is a property of a particular Journal, not of a writer -- and now that we maintain this in the index, we can make shortcuts. This also removes a source of confusion, as we have two methods taking a 'long index' and performing some writer adjustments: - reset(long) is equivalent of setNextIndex() - truncate(long) is equivalent of setLastIndex() Change-Id: I1bc4b5d1b3052c2b35808b8ec4ea2d88dcfca593 Signed-off-by: Robert Varga --- .../storage/journal/ByteBufJournal.java | 7 ++++ .../atomix/storage/journal/ByteBufWriter.java | 7 ---- .../io/atomix/storage/journal/Journal.java | 7 ++++ .../storage/journal/JournalSegment.java | 7 ++-- .../storage/journal/JournalSegmentWriter.java | 38 +++++++------------ .../atomix/storage/journal/JournalWriter.java | 7 ---- .../journal/SegmentedByteBufJournal.java | 5 +++ .../journal/SegmentedByteBufWriter.java | 7 +--- .../storage/journal/SegmentedJournal.java | 5 +++ .../journal/SegmentedJournalWriter.java | 5 --- .../storage/journal/AbstractJournalTest.java | 18 ++++++--- .../akka/segjournal/DataJournalV0.java | 11 +++--- .../segjournal/SegmentedJournalActor.java | 10 +++-- 13 files changed, 68 insertions(+), 66 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java index baaa6b0ba9..dc3e75bcde 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java @@ -23,6 +23,13 @@ import org.eclipse.jdt.annotation.NonNullByDefault; */ @NonNullByDefault public interface ByteBufJournal extends AutoCloseable { + /** + * Return the index of the last entry in the journal. + * + * @return the last index, or zero if there are no entries. + */ + long lastIndex(); + /** * Returns the journal writer. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java index 2f85cc6163..58f9d35291 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java @@ -23,13 +23,6 @@ import org.eclipse.jdt.annotation.NonNullByDefault; */ @NonNullByDefault public interface ByteBufWriter { - /** - * Returns the last written index. - * - * @return The last written index - */ - long lastIndex(); - /** * Returns the next index to be written. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java index 93ae0a565b..39be7d4d5f 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java @@ -23,6 +23,13 @@ import io.atomix.storage.journal.JournalReader.Mode; * @author Jordan Halterman */ public interface Journal extends AutoCloseable { + /** + * Return the index of the last entry in the journal. + * + * @return the last index, or zero if there are no entries. + */ + long lastIndex(); + /** * Returns the journal writer. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index 2128b87e20..844a1c9214 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -39,12 +39,12 @@ import org.slf4j.LoggerFactory; final class JournalSegment { private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class); + private final Set readers = ConcurrentHashMap.newKeySet(); + private final AtomicInteger references = new AtomicInteger(); private final JournalSegmentFile file; private final StorageLevel storageLevel; private final int maxEntrySize; private final JournalIndex journalIndex; - private final Set readers = ConcurrentHashMap.newKeySet(); - private final AtomicInteger references = new AtomicInteger(); private JournalSegmentWriter writer; private boolean open = true; @@ -83,7 +83,8 @@ final class JournalSegment { * @return The last index in the segment. */ long lastIndex() { - return writer.getLastIndex(); + final var lastPosition = journalIndex.last(); + return lastPosition != null ? lastPosition.index() : firstIndex() - 1; } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index b18371f044..70cc790389 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -18,6 +18,7 @@ package io.atomix.storage.journal; import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES; import static java.util.Objects.requireNonNull; +import io.atomix.storage.journal.StorageException.TooLarge; import io.atomix.storage.journal.index.JournalIndex; import io.atomix.storage.journal.index.Position; import io.netty.buffer.ByteBuf; @@ -32,17 +33,17 @@ final class JournalSegmentWriter { private final FileWriter fileWriter; final @NonNull JournalSegment segment; - private final @NonNull JournalIndex index; + private final @NonNull JournalIndex journalIndex; final int maxSegmentSize; final int maxEntrySize; private int currentPosition; JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize, - final JournalIndex index) { + final JournalIndex journalIndex) { this.fileWriter = requireNonNull(fileWriter); this.segment = requireNonNull(segment); - this.index = requireNonNull(index); + this.journalIndex = requireNonNull(journalIndex); maxSegmentSize = segment.file().maxSize(); this.maxEntrySize = maxEntrySize; // adjust lastEntry value @@ -51,31 +52,21 @@ final class JournalSegmentWriter { JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) { segment = previous.segment; - index = previous.index; + journalIndex = previous.journalIndex; maxSegmentSize = previous.maxSegmentSize; maxEntrySize = previous.maxEntrySize; currentPosition = previous.currentPosition; this.fileWriter = requireNonNull(fileWriter); } - /** - * Returns the last written index. - * - * @return The last written index. - */ - long getLastIndex() { - final var last = index.last(); - return last != null ? last.index() : segment.firstIndex() - 1; - } - /** * Returns the next index to be written. * * @return The next index to be written. */ - long getNextIndex() { - final var last = index.last(); - return last != null ? last.index() + 1 : segment.firstIndex(); + long nextIndex() { + final var lastPosition = journalIndex.last(); + return lastPosition != null ? lastPosition.index() + 1 : segment.firstIndex(); } /** @@ -87,12 +78,11 @@ final class JournalSegmentWriter { Position append(final ByteBuf buf) { final var length = buf.readableBytes(); if (length > maxEntrySize) { - throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes (" - + maxEntrySize + ")"); + throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")"); } // Store the entry index. - final long index = getNextIndex(); + final long index = nextIndex(); final int position = currentPosition; // check space available @@ -114,7 +104,7 @@ final class JournalSegmentWriter { // Update the last entry with the correct index/term/length. currentPosition = nextPosition; - return this.index.index(index, position); + return journalIndex.index(index, position); } /** @@ -147,7 +137,7 @@ final class JournalSegmentWriter { break; } - this.index.index(nextIndex++, currentPosition); + journalIndex.index(nextIndex++, currentPosition); // Update the current position for indexing. currentPosition += HEADER_BYTES + buf.readableBytes(); @@ -161,12 +151,12 @@ final class JournalSegmentWriter { */ void truncate(final long index) { // If the index is greater than or equal to the last index, skip the truncate. - if (index >= getLastIndex()) { + if (index >= segment.lastIndex()) { return; } // Truncate the index. - this.index.truncate(index); + journalIndex.truncate(index); if (index < segment.firstIndex()) { // Reset the writer to the first entry. diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java index ae65778185..0561c99fe0 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java @@ -23,13 +23,6 @@ import org.eclipse.jdt.annotation.NonNull; * @author Jordan Halterman */ public interface JournalWriter { - /** - * Returns the last written index. - * - * @return The last written index. - */ - long getLastIndex(); - /** * Returns the next index to be written. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java index 074a5dd182..4698a5fd60 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -92,6 +92,11 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { .sum(); } + @Override + public long lastIndex() { + return lastSegment().lastIndex(); + } + @Override public ByteBufWriter writer() { return writer; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java index e16e6446eb..c51e8a2ffa 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -35,14 +35,9 @@ final class SegmentedByteBufWriter implements ByteBufWriter { currentWriter = currentSegment.acquireWriter(); } - @Override - public long lastIndex() { - return currentWriter.getLastIndex(); - } - @Override public long nextIndex() { - return currentWriter.getNextIndex(); + return currentWriter.nextIndex(); } @Override diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 8f0464ebe3..f6c976742c 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -34,6 +34,11 @@ public final class SegmentedJournal implements Journal { writer = new SegmentedJournalWriter<>(journal.writer(), mapper); } + @Override + public long lastIndex() { + return journal.lastIndex(); + } + @Override public JournalWriter writer() { return writer; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java index 80c352ead0..11aa6c2431 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java @@ -30,11 +30,6 @@ final class SegmentedJournalWriter implements JournalWriter { this.mapper = requireNonNull(mapper); } - @Override - public long getLastIndex() { - return writer.lastIndex(); - } - @Override public long getNextIndex() { return writer.nextIndex(); diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java index d4bc43d9b2..026e58df7b 100644 --- a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java +++ b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java @@ -176,28 +176,34 @@ public abstract class AbstractJournalTest { JournalWriter writer = journal.writer(); JournalReader reader = journal.openReader(1); - assertEquals(0, writer.getLastIndex()); + assertEquals(0, journal.lastIndex()); + assertEquals(1, writer.getNextIndex()); writer.append(ENTRY); writer.append(ENTRY); writer.reset(1); - assertEquals(0, writer.getLastIndex()); + assertEquals(0, journal.lastIndex()); + assertEquals(1, writer.getNextIndex()); writer.append(ENTRY); var indexed = assertNext(reader); assertEquals(1, indexed.index()); writer.reset(1); - assertEquals(0, writer.getLastIndex()); + assertEquals(0, journal.lastIndex()); + assertEquals(1, writer.getNextIndex()); indexed = writer.append(ENTRY); - assertEquals(1, writer.getLastIndex()); + assertEquals(1, journal.lastIndex()); + assertEquals(2, writer.getNextIndex()); assertEquals(1, indexed.index()); indexed = assertNext(reader); assertEquals(1, indexed.index()); writer.truncate(0); - assertEquals(0, writer.getLastIndex()); + assertEquals(0, journal.lastIndex()); + assertEquals(1, writer.getNextIndex()); indexed = writer.append(ENTRY); - assertEquals(1, writer.getLastIndex()); + assertEquals(1, journal.lastIndex()); + assertEquals(2, writer.getNextIndex()); assertEquals(1, indexed.index()); indexed = assertNext(reader); diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java index ad4c110bc8..bf1700f7f0 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java @@ -51,7 +51,7 @@ final class DataJournalV0 extends DataJournal { @Override long lastWrittenSequenceNr() { - return entries.writer().getLastIndex(); + return entries.lastIndex(); } @Override @@ -120,17 +120,18 @@ final class DataJournalV0 extends DataJournal { long writtenBytes = 0; for (int i = 0; i < count; ++i) { - final long mark = writer.getLastIndex(); + final long prevNextIndex = writer.getNextIndex(); final var request = message.getRequest(i); final var reprs = CollectionConverters.asJava(request.payload()); - LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark); + LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), prevNextIndex); try { writtenBytes += writePayload(writer, reprs); } catch (Exception e) { - LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, mark, e); + LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, + prevNextIndex, e); responses.add(e); - writer.truncate(mark); + writer.reset(prevNextIndex); continue; } responses.add(null); diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 73ffab6a05..7e285f7d0d 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -494,9 +494,13 @@ abstract sealed class SegmentedJournalActor extends AbstractActor { } final var sw = Stopwatch.createStarted(); - deleteJournal = SegmentedJournal.builder().withDirectory(directory).withName("delete") - .withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build(); - final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.writer().getLastIndex()) + deleteJournal = SegmentedJournal.builder() + .withDirectory(directory) + .withName("delete") + .withNamespace(DELETE_NAMESPACE) + .withMaxSegmentSize(DELETE_SEGMENT_SIZE) + .build(); + final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex()) .tryNext((index, value, length) -> value); lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue(); -- 2.36.6