From bc005c333cb76e64b48eac215f7f8b938f7a4142 Mon Sep 17 00:00:00 2001 From: Ruslan Kashapov Date: Mon, 22 Apr 2024 17:24:04 +0300 Subject: [PATCH] Separate byte-level atomic-storage access Byte level functionality was moved into *ByteJournal* artifacts and now can be accessed independently. SegmentedJournal is now acts as a type serialization layer on top of ByteJournal. // FIXME: refactor SegmentedJournal.Builder (in a subsequent patch?) JIRA: CONTROLLER-2115 Change-Id: I2e4941bda3af76f0cd59e8c545131af85c668010 Signed-off-by: Ruslan Kashapov Signed-off-by: Robert Varga --- .../storage/journal/ByteBufJournal.java | 51 + ...tJournalReader.java => ByteBufMapper.java} | 26 +- .../atomix/storage/journal/ByteBufReader.java | 80 ++ .../atomix/storage/journal/ByteBufWriter.java | 79 ++ .../atomix/storage/journal/JournalReader.java | 4 +- .../storage/journal/JournalSegmentReader.java | 5 +- .../storage/journal/JournalSegmentWriter.java | 2 +- .../atomix/storage/journal/JournalSerdes.java | 25 +- .../storage/journal/JournalSerializer.java | 48 - .../atomix/storage/journal/JournalWriter.java | 2 + .../journal/SegmentedByteBufJournal.java | 596 +++++++++++ .../journal/SegmentedByteBufReader.java | 149 +++ .../journal/SegmentedByteBufWriter.java | 110 ++ .../SegmentedCommitsByteBufReader.java | 24 + .../storage/journal/SegmentedJournal.java | 941 ++++-------------- .../journal/SegmentedJournalReader.java | 131 +-- .../journal/SegmentedJournalWriter.java | 106 +- 17 files changed, 1407 insertions(+), 972 deletions(-) create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java rename atomix-storage/src/main/java/io/atomix/storage/journal/{CommitsSegmentJournalReader.java => ByteBufMapper.java} (59%) create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java delete mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java create mode 100644 atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java new file mode 100644 index 0000000000..baaa6b0ba9 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * A journal of byte arrays. Provides the ability to write modify entries via {@link ByteBufWriter} and read them + * back via {@link ByteBufReader}. + */ +@NonNullByDefault +public interface ByteBufJournal extends AutoCloseable { + /** + * Returns the journal writer. + * + * @return The journal writer. + */ + ByteBufWriter writer(); + + /** + * Opens a new {@link ByteBufReader} reading all entries. + * + * @param index The index at which to start the reader. + * @return A new journal reader. + */ + ByteBufReader openReader(long index); + + /** + * Opens a new {@link ByteBufReader} reading only committed entries. + * + * @param index The index at which to start the reader. + * @return A new journal reader. + */ + ByteBufReader openCommitsReader(long index); + + @Override + void close(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java similarity index 59% rename from atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java rename to atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java index 767e67fa46..cabd48d8bd 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java @@ -15,19 +15,27 @@ */ package io.atomix.storage.journal; +import io.netty.buffer.ByteBuf; import org.eclipse.jdt.annotation.NonNullByDefault; /** - * A {@link JournalReader} traversing only committed entries. + * Support for serialization of {@link ByteBufJournal} entries. */ @NonNullByDefault -final class CommitsSegmentJournalReader extends SegmentedJournalReader { - CommitsSegmentJournalReader(final SegmentedJournal journal, final JournalSegment segment) { - super(journal, segment); - } +public interface ByteBufMapper { + /** + * Converts an object into a series of bytes in a {@link ByteBuf}. + * + * @param obj the object + * @return resulting buffer + */ + ByteBuf objectToBytes(T obj) ; - @Override - public T tryNext(final EntryMapper mapper) { - return getNextIndex() <= journal.getCommitIndex() ? super.tryNext(mapper) : null; - } + /** + * Converts the contents of a {@link ByteBuf} to an object. + * + * @param buf buffer to convert + * @return resulting object + */ + T bytesToObject(ByteBuf buf); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java new file mode 100644 index 0000000000..1ebe81eef6 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import io.netty.buffer.ByteBuf; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; + +/** + * A reader of {@link ByteBufJournal} entries. + */ +@NonNullByDefault +public interface ByteBufReader extends AutoCloseable { + /** + * A journal entry processor. Responsible for transforming bytes into their internal representation. + * + * @param Internal representation type + */ + @FunctionalInterface + interface EntryMapper { + /** + * Process an entry. + * + * @param index entry index + * @param bytes entry bytes + * @return resulting internal representation + */ + T mapEntry(long index, ByteBuf bytes); + } + + /** + * Returns the first index in the journal. + * + * @return The first index in the journal + */ + long firstIndex(); + + /** + * Returns the next reader index. + * + * @return The next reader index + */ + long nextIndex(); + + /** + * Try to move to the next binary data block + * + * @param entryMapper callback to be invoked on binary data + * @return processed binary data, or {@code null} + */ + @Nullable T tryNext(EntryMapper entryMapper); + + /** + * Resets the reader to the start. + */ + void reset(); + + /** + * Resets the reader to the given index. + * + * @param index The index to which to reset the reader + */ + void reset(long index); + + @Override + void close(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java new file mode 100644 index 0000000000..7211a8844d --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import io.netty.buffer.ByteBuf; +import org.eclipse.jdt.annotation.NonNullByDefault; + +/** + * A writer of {@link ByteBufJournal} entries. + */ +@NonNullByDefault +public interface ByteBufWriter { + /** + * Returns the last written index. + * + * @return The last written index + */ + long lastIndex(); + + /** + * Returns the next index to be written. + * + * @return The next index to be written + */ + long nextIndex(); + + /** + * Appends an entry to the journal. + * + * @param bytes Data block to append + * @return The index of appended data block + */ + // FIXME: throws IOException + long append(ByteBuf bytes); + + /** + * Commits entries up to the given index. + * + * @param index The index up to which to commit entries. + */ + void commit(long index); + + /** + * Resets the head of the journal to the given index. + * + * @param index The index to which to reset the head of the journal + */ + // FIXME: reconcile with reader's reset and truncate() + // FIXME: throws IOException + void reset(long index); + + /** + * Truncates the log to the given index. + * + * @param index The index to which to truncate the log. + */ + // FIXME: reconcile with reset() + // FIXME: throws IOException + void truncate(long index); + + /** + * Flushes written entries to disk. + */ + // FIXME: throws IOException + void flush(); +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java index a3c6ea5366..635f6248c4 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java @@ -75,10 +75,10 @@ public interface JournalReader extends AutoCloseable { /** * Try to move to the next entry. * - * @param mapper callback to be invoked for the entry + * @param entryMapper callback to be invoked for the entry * @return processed entry, or {@code null} */ - @Nullable T tryNext(EntryMapper mapper); + @Nullable T tryNext(EntryMapper entryMapper); /** * Resets the reader to the start. diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java index bba89dfdc9..aa4c0da18a 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java @@ -72,10 +72,9 @@ final class JournalSegmentReader { /** * Reads the next binary data block * - * @param index entry index * @return The binary data, or {@code null} */ - @Nullable ByteBuf readBytes(final long index) { + @Nullable ByteBuf readBytes() { // Check if there is enough in the buffer remaining final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES; if (remaining < 0) { @@ -112,7 +111,7 @@ final class JournalSegmentReader { position += SegmentEntry.HEADER_BYTES + length; // rewind and return - return Unpooled.buffer(length).writeBytes(entryBuffer.rewind()); + return Unpooled.wrappedBuffer(entryBuffer.rewind()); } /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java index 63f5303ecc..dbf6aec214 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java @@ -144,7 +144,7 @@ final class JournalSegmentWriter { reader.setPosition(JournalSegmentDescriptor.BYTES); while (index == 0 || nextIndex <= index) { - final var buf = reader.readBytes(nextIndex); + final var buf = reader.readBytes(); if (buf == null) { break; } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java index a970882edf..ffdc985827 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java @@ -19,6 +19,9 @@ package io.atomix.storage.journal; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; import io.atomix.utils.serializer.KryoJournalSerdesBuilder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -27,7 +30,7 @@ import java.nio.ByteBuffer; /** * Support for serialization of {@link Journal} entries. * - * @deprecated due to dependency on outdated Kryo library, {@link JournalSerializer} to be used instead. + * @deprecated due to dependency on outdated Kryo library, {@link ByteBufMapper} to be used instead. */ @Deprecated(forRemoval = true, since="9.0.3") public interface JournalSerdes { @@ -110,6 +113,26 @@ public interface JournalSerdes { */ T deserialize(final InputStream stream, final int bufferSize); + /** + * Returns a {@link ByteBufMapper} backed by this object. + * + * @return a {@link ByteBufMapper} backed by this object + */ + default ByteBufMapper toMapper() { + return new ByteBufMapper<>() { + @Override + public ByteBuf objectToBytes(final T obj) { + return Unpooled.wrappedBuffer(serialize(obj)); + } + + @Override + public T bytesToObject(final ByteBuf buf) { + // FIXME: ByteBufUtil creates a copy -- we do not want to do that! + return deserialize(ByteBufUtil.getBytes(buf)); + } + }; + } + /** * Creates a new {@link JournalSerdes} builder. * diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java deleted file mode 100644 index eff9af8559..0000000000 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2024 PANTHEON.tech s.r.o. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package io.atomix.storage.journal; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; - -/** - * Support for serialization of {@link Journal} entries. - */ -public interface JournalSerializer { - - /** - * Serializes given object to byte array. - * - * @param obj Object to serialize - * @return serialized bytes as {@link ByteBuf} - */ - ByteBuf serialize(T obj) ; - - /** - * Deserializes given byte array to Object. - * - * @param buf serialized bytes as {@link ByteBuf} - * @return deserialized Object - */ - T deserialize(final ByteBuf buf); - - static JournalSerializer wrap(final JournalSerdes serdes) { - return new JournalSerializer<>() { - @Override - public ByteBuf serialize(final E obj) { - return Unpooled.wrappedBuffer(serdes.serialize(obj)); - } - - @Override - public E deserialize(final ByteBuf buf) { - return serdes.deserialize(ByteBufUtil.getBytes(buf)); - } - }; - } -} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java index 064fd019ec..ba7c5821aa 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java @@ -57,6 +57,7 @@ public interface JournalWriter { * * @param index the index to which to reset the head of the journal */ + // FIXME: reconcile with reader's reset and truncate() void reset(long index); /** @@ -64,6 +65,7 @@ public interface JournalWriter { * * @param index The index to which to truncate the log. */ + // FIXME: reconcile with reset() void truncate(long index); /** diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java new file mode 100644 index 0000000000..3ae64ea82e --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -0,0 +1,596 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.function.BiFunction; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link ByteBufJournal} Implementation. + */ +public final class SegmentedByteBufJournal implements ByteBufJournal { + private static final Logger LOG = LoggerFactory.getLogger(SegmentedByteBufJournal.class); + private static final int SEGMENT_BUFFER_FACTOR = 3; + + private final ConcurrentNavigableMap segments = new ConcurrentSkipListMap<>(); + private final Collection readers = ConcurrentHashMap.newKeySet(); + private final String name; + private final StorageLevel storageLevel; + private final File directory; + private final int maxSegmentSize; + private final int maxEntrySize; + private final double indexDensity; + private final boolean flushOnCommit; + private final @NonNull ByteBufWriter writer; + + private JournalSegment currentSegment; + private volatile long commitIndex; + + public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory, + final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) { + this.name = requireNonNull(name, "name cannot be null"); + this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); + this.directory = requireNonNull(directory, "directory cannot be null"); + this.maxSegmentSize = maxSegmentSize; + this.maxEntrySize = maxEntrySize; + this.indexDensity = indexDensity; + this.flushOnCommit = flushOnCommit; + open(); + writer = new SegmentedByteBufWriter(this); + } + + /** + * Returns the total size of the journal. + * + * @return the total size of the journal + */ + public long size() { + return segments.values().stream() + .mapToLong(segment -> { + try { + return segment.file().size(); + } catch (IOException e) { + throw new StorageException(e); + } + }) + .sum(); + } + + @Override + public ByteBufWriter writer() { + return writer; + } + + @Override + public ByteBufReader openReader(final long index) { + return openReader(index, SegmentedByteBufReader::new); + } + + @NonNullByDefault + private ByteBufReader openReader(final long index, + final BiFunction constructor) { + final var reader = constructor.apply(this, segment(index)); + reader.reset(index); + readers.add(reader); + return reader; + } + + @Override + public ByteBufReader openCommitsReader(final long index) { + return openReader(index, SegmentedCommitsByteBufReader::new); + } + + /** + * Opens the segments. + */ + private synchronized void open() { + // Load existing log segments from disk. + for (var segment : loadSegments()) { + segments.put(segment.firstIndex(), segment); + } + // If a segment doesn't already exist, create an initial segment starting at index 1. + if (segments.isEmpty()) { + currentSegment = createSegment(1, 1); + segments.put(1L, currentSegment); + } else { + currentSegment = segments.lastEntry().getValue(); + } + } + + /** + * Asserts that the manager is open. + * + * @throws IllegalStateException if the segment manager is not open + */ + private void assertOpen() { + checkState(currentSegment != null, "journal not open"); + } + + /** + * Asserts that enough disk space is available to allocate a new segment. + */ + private void assertDiskSpace() { + if (directory.getUsableSpace() < maxSegmentSize * SEGMENT_BUFFER_FACTOR) { + throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment"); + } + } + + /** + * Resets the current segment, creating a new segment if necessary. + */ + private synchronized void resetCurrentSegment() { + final var lastSegment = lastSegment(); + if (lastSegment == null) { + currentSegment = createSegment(1, 1); + segments.put(1L, currentSegment); + } else { + currentSegment = lastSegment; + } + } + + /** + * Resets and returns the first segment in the journal. + * + * @param index the starting index of the journal + * @return the first segment + */ + JournalSegment resetSegments(final long index) { + assertOpen(); + + // If the index already equals the first segment index, skip the reset. + final var firstSegment = firstSegment(); + if (index == firstSegment.firstIndex()) { + return firstSegment; + } + + segments.values().forEach(JournalSegment::delete); + segments.clear(); + + currentSegment = createSegment(1, index); + segments.put(index, currentSegment); + return currentSegment; + } + + /** + * Returns the first segment in the log. + * + * @throws IllegalStateException if the segment manager is not open + */ + JournalSegment firstSegment() { + assertOpen(); + final var firstEntry = segments.firstEntry(); + return firstEntry != null ? firstEntry.getValue() : nextSegment(); + } + + /** + * Returns the last segment in the log. + * + * @throws IllegalStateException if the segment manager is not open + */ + JournalSegment lastSegment() { + assertOpen(); + final var lastEntry = segments.lastEntry(); + return lastEntry != null ? lastEntry.getValue() : nextSegment(); + } + + /** + * Creates and returns the next segment. + * + * @return The next segment. + * @throws IllegalStateException if the segment manager is not open + */ + synchronized JournalSegment nextSegment() { + assertOpen(); + assertDiskSpace(); + + final var index = currentSegment.lastIndex() + 1; + final var lastSegment = lastSegment(); + currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index); + segments.put(index, currentSegment); + return currentSegment; + } + + /** + * Returns the segment following the segment with the given ID. + * + * @param index The segment index with which to look up the next segment. + * @return The next segment for the given index. + */ + JournalSegment nextSegment(final long index) { + final var higherEntry = segments.higherEntry(index); + return higherEntry != null ? higherEntry.getValue() : null; + } + + /** + * Returns the segment for the given index. + * + * @param index The index for which to return the segment. + * @throws IllegalStateException if the segment manager is not open + */ + synchronized JournalSegment segment(final long index) { + assertOpen(); + // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup. + if (currentSegment != null && index > currentSegment.firstIndex()) { + return currentSegment; + } + + // If the index is in another segment, get the entry with the next lowest first index. + final var segment = segments.floorEntry(index); + return segment != null ? segment.getValue() : firstSegment(); + } + + /** + * Removes a segment. + * + * @param segment The segment to remove. + */ + synchronized void removeSegment(final JournalSegment segment) { + segments.remove(segment.firstIndex()); + segment.delete(); + resetCurrentSegment(); + } + + /** + * Creates a new segment. + */ + JournalSegment createSegment(final long id, final long index) { + final JournalSegmentFile file; + try { + file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder() + .withId(id) + .withIndex(index) + .withMaxSegmentSize(maxSegmentSize) + // FIXME: propagate maxEntries + .withMaxEntries(Integer.MAX_VALUE) + .withUpdated(System.currentTimeMillis()) + .build()); + } catch (IOException e) { + throw new StorageException(e); + } + + final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity); + LOG.debug("Created segment: {}", segment); + return segment; + } + + /** + * Loads all segments from disk. + * + * @return A collection of segments for the log. + */ + protected Collection loadSegments() { + // Ensure log directories are created. + directory.mkdirs(); + + final var segmentsMap = new TreeMap(); + + // Iterate through all files in the log directory. + for (var file : directory.listFiles(File::isFile)) { + + // If the file looks like a segment file, attempt to load the segment. + if (JournalSegmentFile.isSegmentFile(name, file)) { + final JournalSegmentFile segmentFile; + try { + segmentFile = JournalSegmentFile.openExisting(file.toPath()); + } catch (IOException e) { + throw new StorageException(e); + } + + // Load the segment. + LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path()); + + // Add the segment to the segments list. + final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity); + segments.put(segment.firstIndex(), segment); + } + } + + // Verify that all the segments in the log align with one another. + JournalSegment previousSegment = null; + boolean corrupted = false; + for (var iterator = segmentsMap.entrySet().iterator(); iterator.hasNext(); ) { + final var segment = iterator.next().getValue(); + if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) { + LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(), + previousSegment.file().path()); + corrupted = true; + } + if (corrupted) { + segment.delete(); + iterator.remove(); + } + previousSegment = segment; + } + + return segmentsMap.values(); + } + + /** + * Resets journal readers to the given head. + * + * @param index The index at which to reset readers. + */ + void resetHead(final long index) { + for (var reader : readers) { + if (reader.nextIndex() < index) { + reader.reset(index); + } + } + } + + /** + * Resets journal readers to the given tail. + * + * @param index The index at which to reset readers. + */ + void resetTail(final long index) { + for (var reader : readers) { + if (reader.nextIndex() >= index) { + reader.reset(index); + } + } + } + + void closeReader(final SegmentedByteBufReader reader) { + readers.remove(reader); + } + + /** + * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index. + * + * @param index the index from which to remove segments + * @return indicates whether a segment can be removed from the journal + */ + public boolean isCompactable(final long index) { + final var segmentEntry = segments.floorEntry(index); + return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0; + } + + /** + * Returns the index of the last segment in the log. + * + * @param index the compaction index + * @return the starting index of the last segment in the log + */ + public long getCompactableIndex(final long index) { + final var segmentEntry = segments.floorEntry(index); + return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0; + } + + /** + * Compacts the journal up to the given index. + *

+ * The semantics of compaction are not specified by this interface. + * + * @param index The index up to which to compact the journal. + */ + public void compact(final long index) { + final var segmentEntry = segments.floorEntry(index); + if (segmentEntry != null) { + final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex()); + if (!compactSegments.isEmpty()) { + LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size()); + compactSegments.values().forEach(JournalSegment::delete); + compactSegments.clear(); + resetHead(segmentEntry.getValue().firstIndex()); + } + } + } + + @Override + public void close() { + if (currentSegment != null) { + currentSegment = null; + segments.values().forEach(JournalSegment::close); + segments.clear(); + } + } + + /** + * Returns whether {@code flushOnCommit} is enabled for the log. + * + * @return Indicates whether {@code flushOnCommit} is enabled for the log. + */ + boolean isFlushOnCommit() { + return flushOnCommit; + } + + /** + * Updates commit index to the given value. + * + * @param index The index value. + */ + void setCommitIndex(final long index) { + commitIndex = index; + } + + /** + * Returns the journal last commit index. + * + * @return The journal last commit index. + */ + long getCommitIndex() { + return commitIndex; + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Segmented byte journal builder. + */ + public static final class Builder { + private static final boolean DEFAULT_FLUSH_ON_COMMIT = false; + private static final String DEFAULT_NAME = "atomix"; + private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir"); + private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32; + private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024; + private static final double DEFAULT_INDEX_DENSITY = .005; + + private String name = DEFAULT_NAME; + private StorageLevel storageLevel = StorageLevel.DISK; + private File directory = new File(DEFAULT_DIRECTORY); + private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE; + private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE; + private double indexDensity = DEFAULT_INDEX_DENSITY; + private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT; + + private Builder() { + // on purpose + } + + /** + * Sets the journal name. + * + * @param name The journal name. + * @return The builder instance + */ + public Builder withName(final String name) { + this.name = requireNonNull(name, "name cannot be null"); + return this; + } + + /** + * Sets the storage level. + * + * @param storageLevel The storage level. + * @return The builder instance + */ + public Builder withStorageLevel(final StorageLevel storageLevel) { + this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); + return this; + } + + /** + * Sets the journal directory. + * + * @param directory The log directory. + * @return The builder instance + * @throws NullPointerException If the {@code directory} is {@code null} + */ + public Builder withDirectory(final String directory) { + return withDirectory(new File(requireNonNull(directory, "directory cannot be null"))); + } + + /** + * Sets the journal directory + * + * @param directory The log directory. + * @return The builder instance + * @throws NullPointerException If the {@code directory} is {@code null} + */ + public Builder withDirectory(final File directory) { + this.directory = requireNonNull(directory, "directory cannot be null"); + return this; + } + + /** + * Sets the maximum segment size in bytes. + * By default, the maximum segment size is {@code 1024 * 1024 * 32}. + * + * @param maxSegmentSize The maximum segment size in bytes. + * @return The builder instance + * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive + */ + public Builder withMaxSegmentSize(final int maxSegmentSize) { + checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, + "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES); + this.maxSegmentSize = maxSegmentSize; + return this; + } + + /** + * Sets the maximum entry size in bytes. + * + * @param maxEntrySize the maximum entry size in bytes + * @return the builder instance + * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive + */ + public Builder withMaxEntrySize(final int maxEntrySize) { + checkArgument(maxEntrySize > 0, "maxEntrySize must be positive"); + this.maxEntrySize = maxEntrySize; + return this; + } + + /** + * Sets the journal index density. + *

+ * The index density is the frequency at which the position of entries written to the journal will be + * recorded in an in-memory index for faster seeking. + * + * @param indexDensity the index density + * @return the builder instance + * @throws IllegalArgumentException if the density is not between 0 and 1 + */ + public Builder withIndexDensity(final double indexDensity) { + checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1"); + this.indexDensity = indexDensity; + return this; + } + + /** + * Enables flushing buffers to disk when entries are committed to a segment. + *

+ * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time + * an entry is committed in a given segment. + * + * @return The builder instance + */ + public Builder withFlushOnCommit() { + return withFlushOnCommit(true); + } + + /** + * Sets whether to flush buffers to disk when entries are committed to a segment. + *

+ * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time + * an entry is committed in a given segment. + * + * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment. + * @return The builder instance + */ + public Builder withFlushOnCommit(final boolean flushOnCommit) { + this.flushOnCommit = flushOnCommit; + return this; + } + + /** + * Build the {@link SegmentedByteBufJournal}. + * + * @return {@link SegmentedByteBufJournal} instance built. + */ + public SegmentedByteBufJournal build() { + return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize, + indexDensity, flushOnCommit); + } + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java new file mode 100644 index 0000000000..d164676845 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java @@ -0,0 +1,149 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static java.util.Objects.requireNonNull; + +import io.netty.buffer.ByteBuf; +import org.eclipse.jdt.annotation.NonNull; + +/** + * A {@link ByteBufReader} implementation. + */ +sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCommitsByteBufReader { + final @NonNull SegmentedByteBufJournal journal; + + private JournalSegment currentSegment; + private JournalSegmentReader currentReader; + private long nextIndex; + + SegmentedByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) { + this.journal = requireNonNull(journal); + currentSegment = requireNonNull(segment); + currentReader = segment.createReader(); + nextIndex = currentSegment.firstIndex(); + } + + @Override + public final long firstIndex() { + return journal.firstSegment().firstIndex(); + } + + @Override + public final long nextIndex() { + return nextIndex; + } + + @Override + public final void reset() { + currentReader.close(); + currentSegment = journal.firstSegment(); + currentReader = currentSegment.createReader(); + nextIndex = currentSegment.firstIndex(); + } + + @Override + public final void reset(final long index) { + // If the current segment is not open, it has been replaced. Reset the segments. + if (!currentSegment.isOpen()) { + reset(); + } + if (index < nextIndex) { + rewind(index); + } else if (index > nextIndex) { + forwardTo(index); + } else { + resetCurrentReader(index); + } + } + + private void resetCurrentReader(final long index) { + final var position = currentSegment.lookup(index - 1); + if (position != null) { + nextIndex = position.index(); + currentReader.setPosition(position.position()); + } else { + nextIndex = currentSegment.firstIndex(); + currentReader.setPosition(JournalSegmentDescriptor.BYTES); + } + forwardTo(index); + } + + /** + * Rewinds the journal to the given index. + */ + private void rewind(final long index) { + if (currentSegment.firstIndex() >= index) { + final var segment = journal.segment(index - 1); + if (segment != null) { + currentReader.close(); + currentSegment = segment; + currentReader = currentSegment.createReader(); + } + } + resetCurrentReader(index); + } + + private void forwardTo(final long index) { + while (nextIndex < index && tryAdvance(nextIndex) != null) { + // No-op -- nextIndex value is updated in tryAdvance() + } + } + + @Override + public final T tryNext(final EntryMapper entryMapper) { + final var index = nextIndex; + final var bytes = tryAdvance(index); + return bytes == null ? null : entryMapper.mapEntry(index, bytes); + } + + /** + * Attempt to read the next entry. {@code index} here is really {@code nextIndex} passed by callers, which already + * check it for invariants. If non-null is returned, {@code nextIndex} has already been set to {@code index + 1}. + * + *

+ * This method is shared between 'all entries' and 'committed entries only' variants. The distinction is made by + * an additional check in {@link SegmentedCommitsByteBufReader#tryAdvance(long)}. + * + * @param index next index + * @return Entry bytes, or {@code null} + */ + ByteBuf tryAdvance(final long index) { + var buf = currentReader.readBytes(); + if (buf == null) { + final var nextSegment = journal.nextSegment(currentSegment.firstIndex()); + if (nextSegment == null || nextSegment.firstIndex() != index) { + return null; + } + currentReader.close(); + currentSegment = nextSegment; + currentReader = currentSegment.createReader(); + buf = currentReader.readBytes(); + if (buf == null) { + return null; + } + } + nextIndex = index + 1; + return buf; + } + + @Override + public final void close() { + currentReader.close(); + journal.closeReader(this); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java new file mode 100644 index 0000000000..7e92815f34 --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java @@ -0,0 +1,110 @@ +/* + * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import io.netty.buffer.ByteBuf; + +/** + * A {@link ByteBufWriter} implementation. + */ +final class SegmentedByteBufWriter implements ByteBufWriter { + private final SegmentedByteBufJournal journal; + + private JournalSegment currentSegment; + private JournalSegmentWriter currentWriter; + + SegmentedByteBufWriter(final SegmentedByteBufJournal journal) { + this.journal = requireNonNull(journal); + currentSegment = journal.lastSegment(); + currentWriter = currentSegment.acquireWriter(); + } + + @Override + public long lastIndex() { + return currentWriter.getLastIndex(); + } + + @Override + public long nextIndex() { + return currentWriter.getNextIndex(); + } + + @Override + public void reset(final long index) { + if (index > currentSegment.firstIndex()) { + currentSegment.releaseWriter(); + currentSegment = journal.resetSegments(index); + currentWriter = currentSegment.acquireWriter(); + } else { + truncate(index - 1); + } + journal.resetHead(index); + } + + @Override + public void commit(final long index) { + if (index > journal.getCommitIndex()) { + journal.setCommitIndex(index); + if (journal.isFlushOnCommit()) { + flush(); + } + } + } + + @Override + public long append(final ByteBuf buf) { + var index = currentWriter.append(buf); + if (index != null) { + return index; + } + // Slow path: we do not have enough capacity + currentWriter.flush(); + currentSegment.releaseWriter(); + currentSegment = journal.nextSegment(); + currentWriter = currentSegment.acquireWriter(); + return verifyNotNull(currentWriter.append(buf)); + } + + @Override + public void truncate(final long index) { + if (index < journal.getCommitIndex()) { + throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index); + } + + // Delete all segments with first indexes greater than the given index. + while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) { + currentSegment.releaseWriter(); + journal.removeSegment(currentSegment); + currentSegment = journal.lastSegment(); + currentWriter = currentSegment.acquireWriter(); + } + + // Truncate the current index. + currentWriter.truncate(index); + + // Reset segment readers. + journal.resetTail(index + 1); + } + + @Override + public void flush() { + currentWriter.flush(); + } +} diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java new file mode 100644 index 0000000000..43fae62a1e --- /dev/null +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package io.atomix.storage.journal; + +import io.netty.buffer.ByteBuf; + +/** + * A {@link ByteBufReader} traversing only committed entries. + */ +final class SegmentedCommitsByteBufReader extends SegmentedByteBufReader { + SegmentedCommitsByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) { + super(journal, segment); + } + + @Override + ByteBuf tryAdvance(final long index) { + return index <= journal.getCommitIndex() ? super.tryAdvance(index) : null; + } +} \ No newline at end of file diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 1ae77fa351..cd07492692 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -1,5 +1,6 @@ /* * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,772 +16,258 @@ */ package io.atomix.storage.journal; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicBoolean; /** * Segmented journal. */ public final class SegmentedJournal implements Journal { - /** - * Returns a new Raft log builder. - * - * @return A new Raft log builder. - */ - public static Builder builder() { - return new Builder<>(); - } - - private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class); - private static final int SEGMENT_BUFFER_FACTOR = 3; - - private final String name; - private final StorageLevel storageLevel; - private final File directory; - private final JournalSerializer serializer; - private final int maxSegmentSize; - private final int maxEntrySize; - private final int maxEntriesPerSegment; - private final double indexDensity; - private final boolean flushOnCommit; - private final SegmentedJournalWriter writer; - private volatile long commitIndex; - - private final ConcurrentNavigableMap segments = new ConcurrentSkipListMap<>(); - private final Collection> readers = ConcurrentHashMap.newKeySet(); - private JournalSegment currentSegment; - - private volatile boolean open = true; - - public SegmentedJournal( - String name, - StorageLevel storageLevel, - File directory, - JournalSerdes namespace, - int maxSegmentSize, - int maxEntrySize, - int maxEntriesPerSegment, - double indexDensity, - boolean flushOnCommit) { - this.name = requireNonNull(name, "name cannot be null"); - this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); - this.directory = requireNonNull(directory, "directory cannot be null"); - this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null")); - this.maxSegmentSize = maxSegmentSize; - this.maxEntrySize = maxEntrySize; - this.maxEntriesPerSegment = maxEntriesPerSegment; - this.indexDensity = indexDensity; - this.flushOnCommit = flushOnCommit; - open(); - this.writer = new SegmentedJournalWriter<>(this); - } - - /** - * Returns the segment file name prefix. - * - * @return The segment file name prefix. - */ - public String name() { - return name; - } - - /** - * Returns the storage directory. - *

- * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be - * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided - * when the log is opened. - * - * @return The storage directory. - */ - public File directory() { - return directory; - } - - /** - * Returns the storage level. - *

- * The storage level dictates how entries within individual journal segments should be stored. - * - * @return The storage level. - */ - public StorageLevel storageLevel() { - return storageLevel; - } - - /** - * Returns the maximum journal segment size. - *

- * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes. - * - * @return The maximum segment size in bytes. - */ - public int maxSegmentSize() { - return maxSegmentSize; - } - - /** - * Returns the maximum journal entry size. - *

- * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes. - * - * @return the maximum entry size in bytes - */ - public int maxEntrySize() { - return maxEntrySize; - } - - /** - * Returns the maximum number of entries per segment. - *

- * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment - * in a journal. - * - * @return The maximum number of entries per segment. - * @deprecated since 3.0.2 - */ - @Deprecated - public int maxEntriesPerSegment() { - return maxEntriesPerSegment; - } - - /** - * Returns the collection of journal segments. - * - * @return the collection of journal segments - */ - public Collection segments() { - return segments.values(); - } - - /** - * Returns the collection of journal segments with indexes greater than the given index. - * - * @param index the starting index - * @return the journal segments starting with indexes greater than or equal to the given index - */ - public Collection segments(long index) { - return segments.tailMap(index).values(); - } - - /** - * Returns serializer instance. - * - * @return serializer instance - */ - JournalSerializer serializer() { - return serializer; - } - - /** - * Returns the total size of the journal. - * - * @return the total size of the journal - */ - public long size() { - return segments.values().stream() - .mapToLong(segment -> { - try { - return segment.file().size(); - } catch (IOException e) { - throw new StorageException(e); - } - }) - .sum(); - } - - @Override - public JournalWriter writer() { - return writer; - } - - @Override - public JournalReader openReader(long index) { - return openReader(index, JournalReader.Mode.ALL); - } - - /** - * Opens a new Raft log reader with the given reader mode. - * - * @param index The index from which to begin reading entries. - * @param mode The mode in which to read entries. - * @return The Raft log reader. - */ - @Override - public JournalReader openReader(long index, JournalReader.Mode mode) { - final var segment = getSegment(index); - final var reader = switch (mode) { - case ALL -> new SegmentedJournalReader<>(this, segment); - case COMMITS -> new CommitsSegmentJournalReader<>(this, segment); - }; - - // Forward reader to specified index - long next = reader.getNextIndex(); - while (index > next && reader.tryAdvance()) { - next = reader.getNextIndex(); + private final AtomicBoolean open = new AtomicBoolean(true); + private final SegmentedByteBufJournal journal; + private final SegmentedJournalWriter writer; + private final ByteBufMapper mapper; + + public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper mapper) { + this.journal = requireNonNull(journal, "journal is required"); + this.mapper = requireNonNull(mapper, "mapper cannot be null"); + writer = new SegmentedJournalWriter<>(journal.writer(), mapper); } - readers.add(reader); - return reader; - } - - /** - * Opens the segments. - */ - private synchronized void open() { - // Load existing log segments from disk. - for (var segment : loadSegments()) { - segments.put(segment.firstIndex(), segment); + @Override + public JournalWriter writer() { + return writer; } - // If a segment doesn't already exist, create an initial segment starting at index 1. - if (!segments.isEmpty()) { - currentSegment = segments.lastEntry().getValue(); - } else { - currentSegment = createSegment(1, 1); - segments.put(1L, currentSegment); - } - } - - /** - * Asserts that the manager is open. - * - * @throws IllegalStateException if the segment manager is not open - */ - private void assertOpen() { - checkState(currentSegment != null, "journal not open"); - } - - /** - * Asserts that enough disk space is available to allocate a new segment. - */ - private void assertDiskSpace() { - if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) { - throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment"); - } - } - - /** - * Resets the current segment, creating a new segment if necessary. - */ - private synchronized void resetCurrentSegment() { - final var lastSegment = getLastSegment(); - if (lastSegment == null) { - currentSegment = createSegment(1, 1); - segments.put(1L, currentSegment); - } else { - currentSegment = lastSegment; - } - } - - /** - * Resets and returns the first segment in the journal. - * - * @param index the starting index of the journal - * @return the first segment - */ - JournalSegment resetSegments(long index) { - assertOpen(); - - // If the index already equals the first segment index, skip the reset. - final var firstSegment = getFirstSegment(); - if (index == firstSegment.firstIndex()) { - return firstSegment; - } - - segments.values().forEach(JournalSegment::delete); - segments.clear(); - - currentSegment = createSegment(1, index); - segments.put(index, currentSegment); - return currentSegment; - } - - /** - * Returns the first segment in the log. - * - * @throws IllegalStateException if the segment manager is not open - */ - JournalSegment getFirstSegment() { - assertOpen(); - Map.Entry segment = segments.firstEntry(); - return segment != null ? segment.getValue() : null; - } - - /** - * Returns the last segment in the log. - * - * @throws IllegalStateException if the segment manager is not open - */ - JournalSegment getLastSegment() { - assertOpen(); - Map.Entry segment = segments.lastEntry(); - return segment != null ? segment.getValue() : null; - } - - /** - * Creates and returns the next segment. - * - * @return The next segment. - * @throws IllegalStateException if the segment manager is not open - */ - synchronized JournalSegment getNextSegment() { - assertOpen(); - assertDiskSpace(); - - final var index = currentSegment.lastIndex() + 1; - final var lastSegment = getLastSegment(); - currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index); - segments.put(index, currentSegment); - return currentSegment; - } - - /** - * Returns the segment following the segment with the given ID. - * - * @param index The segment index with which to look up the next segment. - * @return The next segment for the given index. - */ - JournalSegment getNextSegment(long index) { - Map.Entry nextSegment = segments.higherEntry(index); - return nextSegment != null ? nextSegment.getValue() : null; - } - - /** - * Returns the segment for the given index. - * - * @param index The index for which to return the segment. - * @throws IllegalStateException if the segment manager is not open - */ - synchronized JournalSegment getSegment(long index) { - assertOpen(); - // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup. - if (currentSegment != null && index > currentSegment.firstIndex()) { - return currentSegment; - } - - // If the index is in another segment, get the entry with the next lowest first index. - Map.Entry segment = segments.floorEntry(index); - if (segment != null) { - return segment.getValue(); - } - return getFirstSegment(); - } - - /** - * Removes a segment. - * - * @param segment The segment to remove. - */ - synchronized void removeSegment(JournalSegment segment) { - segments.remove(segment.firstIndex()); - segment.delete(); - resetCurrentSegment(); - } - - /** - * Creates a new segment. - */ - JournalSegment createSegment(long id, long index) { - final JournalSegmentFile file; - try { - file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder() - .withId(id) - .withIndex(index) - .withMaxSegmentSize(maxSegmentSize) - .withMaxEntries(maxEntriesPerSegment) - .withUpdated(System.currentTimeMillis()) - .build()); - } catch (IOException e) { - throw new StorageException(e); - } - - final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity); - LOG.debug("Created segment: {}", segment); - return segment; - } - - /** - * Loads all segments from disk. - * - * @return A collection of segments for the log. - */ - protected Collection loadSegments() { - // Ensure log directories are created. - directory.mkdirs(); - - final var segments = new TreeMap(); - - // Iterate through all files in the log directory. - for (var file : directory.listFiles(File::isFile)) { - - // If the file looks like a segment file, attempt to load the segment. - if (JournalSegmentFile.isSegmentFile(name, file)) { - final JournalSegmentFile segmentFile; - try { - segmentFile = JournalSegmentFile.openExisting(file.toPath()); - } catch (IOException e) { - throw new StorageException(e); - } - - // Load the segment. - LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path()); - - // Add the segment to the segments list. - final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity); - segments.put(segment.firstIndex(), segment); - } - } - - // Verify that all the segments in the log align with one another. - JournalSegment previousSegment = null; - boolean corrupted = false; - final var iterator = segments.entrySet().iterator(); - while (iterator.hasNext()) { - final var segment = iterator.next().getValue(); - if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) { - LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(), - previousSegment.file().path()); - corrupted = true; - } - if (corrupted) { - segment.delete(); - iterator.remove(); - } - previousSegment = segment; - } - - return segments.values(); - } - - /** - * Resets journal readers to the given head. - * - * @param index The index at which to reset readers. - */ - void resetHead(long index) { - for (var reader : readers) { - if (reader.getNextIndex() < index) { - reader.reset(index); - } - } - } - - /** - * Resets journal readers to the given tail. - * - * @param index The index at which to reset readers. - */ - void resetTail(long index) { - for (var reader : readers) { - if (reader.getNextIndex() >= index) { - reader.reset(index); - } - } - } - - void closeReader(SegmentedJournalReader reader) { - readers.remove(reader); - } - - @Override - public boolean isOpen() { - return open; - } - - /** - * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index. - * - * @param index the index from which to remove segments - * @return indicates whether a segment can be removed from the journal - */ - public boolean isCompactable(long index) { - Map.Entry segmentEntry = segments.floorEntry(index); - return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0; - } - - /** - * Returns the index of the last segment in the log. - * - * @param index the compaction index - * @return the starting index of the last segment in the log - */ - public long getCompactableIndex(long index) { - Map.Entry segmentEntry = segments.floorEntry(index); - return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0; - } - - /** - * Compacts the journal up to the given index. - *

- * The semantics of compaction are not specified by this interface. - * - * @param index The index up to which to compact the journal. - */ - public void compact(long index) { - final var segmentEntry = segments.floorEntry(index); - if (segmentEntry != null) { - final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex()); - if (!compactSegments.isEmpty()) { - LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size()); - compactSegments.values().forEach(JournalSegment::delete); - compactSegments.clear(); - resetHead(segmentEntry.getValue().firstIndex()); - } - } - } - - @Override - public void close() { - segments.values().forEach(JournalSegment::close); - currentSegment = null; - open = false; - } - - /** - * Returns whether {@code flushOnCommit} is enabled for the log. - * - * @return Indicates whether {@code flushOnCommit} is enabled for the log. - */ - boolean isFlushOnCommit() { - return flushOnCommit; - } - - /** - * Commits entries up to the given index. - * - * @param index The index up to which to commit entries. - */ - void setCommitIndex(long index) { - this.commitIndex = index; - } - - /** - * Returns the Raft log commit index. - * - * @return The Raft log commit index. - */ - long getCommitIndex() { - return commitIndex; - } - - /** - * Raft log builder. - */ - public static final class Builder { - private static final boolean DEFAULT_FLUSH_ON_COMMIT = false; - private static final String DEFAULT_NAME = "atomix"; - private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir"); - private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32; - private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024; - private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024; - private static final double DEFAULT_INDEX_DENSITY = .005; - - private String name = DEFAULT_NAME; - private StorageLevel storageLevel = StorageLevel.DISK; - private File directory = new File(DEFAULT_DIRECTORY); - private JournalSerdes namespace; - private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE; - private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE; - private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT; - private double indexDensity = DEFAULT_INDEX_DENSITY; - private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT; - - Builder() { - // Hidden on purpose + @Override + public JournalReader openReader(final long index) { + return openReader(index, JournalReader.Mode.ALL); } /** - * Sets the storage name. + * Opens a new journal reader with the given reader mode. * - * @param name The storage name. - * @return The storage builder. + * @param index The index from which to begin reading entries. + * @param mode The mode in which to read entries. + * @return The journal reader. */ - public Builder withName(String name) { - this.name = requireNonNull(name, "name cannot be null"); - return this; + @Override + public JournalReader openReader(final long index, final JournalReader.Mode mode) { + final var byteReader = switch (mode) { + case ALL -> journal.openReader(index); + case COMMITS -> journal.openCommitsReader(index); + }; + return new SegmentedJournalReader<>(byteReader, mapper); } - /** - * Sets the log storage level, returning the builder for method chaining. - *

- * The storage level indicates how individual entries should be persisted in the journal. - * - * @param storageLevel The log storage level. - * @return The storage builder. - */ - public Builder withStorageLevel(StorageLevel storageLevel) { - this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); - return this; + @Override + public boolean isOpen() { + return open.get(); } - /** - * Sets the log directory, returning the builder for method chaining. - *

- * The log will write segment files into the provided directory. - * - * @param directory The log directory. - * @return The storage builder. - * @throws NullPointerException If the {@code directory} is {@code null} - */ - public Builder withDirectory(String directory) { - return withDirectory(new File(requireNonNull(directory, "directory cannot be null"))); + @Override + public void close() { + if (open.compareAndExchange(true, false)) { + journal.close(); + } } /** - * Sets the log directory, returning the builder for method chaining. + * Compacts the journal up to the given index. *

- * The log will write segment files into the provided directory. + * The semantics of compaction are not specified by this interface. * - * @param directory The log directory. - * @return The storage builder. - * @throws NullPointerException If the {@code directory} is {@code null} + * @param index The index up to which to compact the journal. */ - public Builder withDirectory(File directory) { - this.directory = requireNonNull(directory, "directory cannot be null"); - return this; + public void compact(final long index) { + journal.compact(index); } /** - * Sets the journal namespace, returning the builder for method chaining. + * Returns a new segmented journal builder. * - * @param namespace The journal serializer. - * @return The journal builder. + * @return A new segmented journal builder. */ - public Builder withNamespace(JournalSerdes namespace) { - this.namespace = requireNonNull(namespace, "namespace cannot be null"); - return this; + public static Builder builder() { + return new Builder<>(); } - /** - * Sets the maximum segment size in bytes, returning the builder for method chaining. - *

- * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment - * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new - * segment and append new entries to that segment. - *

- * By default, the maximum segment size is {@code 1024 * 1024 * 32}. - * - * @param maxSegmentSize The maximum segment size in bytes. - * @return The storage builder. - * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive - */ - public Builder withMaxSegmentSize(int maxSegmentSize) { - checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, - "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES); - this.maxSegmentSize = maxSegmentSize; - return this; - } + public static final class Builder { + private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder(); + private ByteBufMapper mapper; - /** - * Sets the maximum entry size in bytes, returning the builder for method chaining. - * - * @param maxEntrySize the maximum entry size in bytes - * @return the storage builder - * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive - */ - public Builder withMaxEntrySize(int maxEntrySize) { - checkArgument(maxEntrySize > 0, "maxEntrySize must be positive"); - this.maxEntrySize = maxEntrySize; - return this; - } + private Builder() { + // on purpose + } - /** - * Sets the maximum number of allows entries per segment, returning the builder for method chaining. - *

- * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment - * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a - * new segment and append new entries to that segment. - *

- * By default, the maximum entries per segment is {@code 1024 * 1024}. - * - * @param maxEntriesPerSegment The maximum number of entries allowed per segment. - * @return The storage builder. - * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries - * per segment - * @deprecated since 3.0.2 - */ - @Deprecated - public Builder withMaxEntriesPerSegment(int maxEntriesPerSegment) { - checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive"); - checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT, - "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT); - this.maxEntriesPerSegment = maxEntriesPerSegment; - return this; - } + /** + * Sets the journal name. + * + * @param name The journal name. + * @return The journal builder. + */ + public Builder withName(final String name) { + byteJournalBuilder.withName(name); + return this; + } - /** - * Sets the journal index density. - *

- * The index density is the frequency at which the position of entries written to the journal will be recorded in an - * in-memory index for faster seeking. - * - * @param indexDensity the index density - * @return the journal builder - * @throws IllegalArgumentException if the density is not between 0 and 1 - */ - public Builder withIndexDensity(double indexDensity) { - checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1"); - this.indexDensity = indexDensity; - return this; - } + /** + * Sets the journal storage level. + *

+ * The storage level indicates how individual entries will be persisted in the journal. + * + * @param storageLevel The log storage level. + * @return The journal builder. + */ + public Builder withStorageLevel(final StorageLevel storageLevel) { + byteJournalBuilder.withStorageLevel(storageLevel); + return this; + } - /** - * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method - * chaining. - *

- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is - * committed in a given segment. - * - * @return The storage builder. - */ - public Builder withFlushOnCommit() { - return withFlushOnCommit(true); - } + /** + * Sets the journal storage directory. + *

+ * The journal will write segment files into the provided directory. + * + * @param directory The journal storage directory. + * @return The journal builder. + * @throws NullPointerException If the {@code directory} is {@code null} + */ + public Builder withDirectory(final String directory) { + byteJournalBuilder.withDirectory(directory); + return this; + } - /** - * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method - * chaining. - *

- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is - * committed in a given segment. - * - * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment. - * @return The storage builder. - */ - public Builder withFlushOnCommit(boolean flushOnCommit) { - this.flushOnCommit = flushOnCommit; - return this; - } + /** + * Sets the journal storage directory. + *

+ * The journal will write segment files into the provided directory. + * + * @param directory The journal storage directory. + * @return The journal builder. + * @throws NullPointerException If the {@code directory} is {@code null} + */ + public Builder withDirectory(final File directory) { + byteJournalBuilder.withDirectory(directory); + return this; + } - /** - * Build the {@link SegmentedJournal}. - * - * @return A new {@link SegmentedJournal}. - */ - public SegmentedJournal build() { - return new SegmentedJournal<>( - name, - storageLevel, - directory, - namespace, - maxSegmentSize, - maxEntrySize, - maxEntriesPerSegment, - indexDensity, - flushOnCommit); + /** + * Sets the journal namespace. + * + * @param namespace The journal serializer. + * @return The journal builder. + * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead + */ + @Deprecated(forRemoval = true, since="9.0.3") + public Builder withNamespace(final JournalSerdes namespace) { + return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper()); + } + + /** + * Sets journal serializer. + * + * @param mapper Journal serializer + * @return The journal builder + */ + public Builder withMapper(final ByteBufMapper mapper) { + this.mapper = requireNonNull(mapper); + return this; + } + + /** + * Sets the maximum segment size in bytes. + *

+ * The maximum segment size dictates when journal should roll over to new segments. As entries are written + * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the + * journal will create a new segment and append new entries to that segment. + *

+ * By default, the maximum segment size is 32M. + * + * @param maxSegmentSize The maximum segment size in bytes. + * @return The storage builder. + * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive + */ + public Builder withMaxSegmentSize(final int maxSegmentSize) { + byteJournalBuilder.withMaxSegmentSize(maxSegmentSize); + return this; + } + + /** + * Sets the maximum entry size in bytes. + * + * @param maxEntrySize the maximum entry size in bytes + * @return the storage builder + * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive + */ + public Builder withMaxEntrySize(final int maxEntrySize) { + byteJournalBuilder.withMaxEntrySize(maxEntrySize); + return this; + } + + /** + * Sets the maximum number of entries per segment. + * + * @param maxEntriesPerSegment The maximum number of entries allowed per segment. + * @return The journal builder. + * @deprecated since 3.0.2, no longer used + */ + @Deprecated + public Builder withMaxEntriesPerSegment(final int maxEntriesPerSegment) { + // ignore + return this; + } + + /** + * Sets the journal index density. + *

+ * The index density is the frequency at which the position of entries written to the journal will be recorded + * in an in-memory index for faster seeking. + * + * @param indexDensity the index density + * @return the journal builder + * @throws IllegalArgumentException if the density is not between 0 and 1 + */ + public Builder withIndexDensity(final double indexDensity) { + byteJournalBuilder.withIndexDensity(indexDensity); + return this; + } + + /** + * Enables flushing buffers to disk when entries are committed to a segment. + *

+ * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an + * entry is committed in a given segment. + * + * @return The journal builder. + */ + public Builder withFlushOnCommit() { + return withFlushOnCommit(true); + } + + /** + * Enables flushing buffers to disk when entries are committed to a segment. + *

+ * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an + * entry is committed in a given segment. + * + * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment. + * @return The journal builder. + */ + public Builder withFlushOnCommit(final boolean flushOnCommit) { + byteJournalBuilder.withFlushOnCommit(flushOnCommit); + return this; + } + + /** + * Build the {@link SegmentedJournal}. + * + * @return {@link SegmentedJournal} instance. + */ + public SegmentedJournal build() { + return new SegmentedJournal<>(byteJournalBuilder.build(), mapper); + } } - } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java index a5deb6382e..f28390c84b 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java @@ -1,6 +1,6 @@ /* * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. - * Copyright (c) 2024 PANTHEON.tech, s.r.o. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,134 +18,49 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; -import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; /** - * A {@link JournalReader} traversing all entries. + * A {@link JournalReader} backed by a {@link ByteBufReader}. */ -sealed class SegmentedJournalReader implements JournalReader permits CommitsSegmentJournalReader { - // Marker non-null object for tryAdvance() - private static final @NonNull Object ADVANCED = new Object(); +final class SegmentedJournalReader implements JournalReader { + private final ByteBufMapper mapper; + private final ByteBufReader reader; - final SegmentedJournal journal; - - private JournalSegment currentSegment; - private JournalSegmentReader currentReader; - private long nextIndex; - - SegmentedJournalReader(final SegmentedJournal journal, final JournalSegment segment) { - this.journal = requireNonNull(journal); - currentSegment = requireNonNull(segment); - currentReader = segment.createReader(); - nextIndex = currentSegment.firstIndex(); + SegmentedJournalReader(final ByteBufReader reader, final ByteBufMapper mapper) { + this.reader = requireNonNull(reader); + this.mapper = requireNonNull(mapper); } @Override - public final long getFirstIndex() { - return journal.getFirstSegment().firstIndex(); + public long getFirstIndex() { + return reader.firstIndex(); } @Override - public final long getNextIndex() { - return nextIndex; + public long getNextIndex() { + return reader.nextIndex(); } @Override - public final void reset() { - currentReader.close(); - - currentSegment = journal.getFirstSegment(); - currentReader = currentSegment.createReader(); - nextIndex = currentSegment.firstIndex(); + public void reset() { + reader.reset(); } @Override - public final void reset(final long index) { - // If the current segment is not open, it has been replaced. Reset the segments. - if (!currentSegment.isOpen()) { - reset(); - } - - if (index < nextIndex) { - rewind(index); - } else if (index > nextIndex) { - while (index > nextIndex && tryAdvance()) { - // Nothing else - } - } else { - resetCurrentReader(index); - } - } - - private void resetCurrentReader(final long index) { - final var position = currentSegment.lookup(index - 1); - if (position != null) { - nextIndex = position.index(); - currentReader.setPosition(position.position()); - } else { - nextIndex = currentSegment.firstIndex(); - currentReader.setPosition(JournalSegmentDescriptor.BYTES); - } - while (nextIndex < index && tryAdvance()) { - // Nothing else - } - } - - /** - * Rewinds the journal to the given index. - */ - private void rewind(final long index) { - if (currentSegment.firstIndex() >= index) { - JournalSegment segment = journal.getSegment(index - 1); - if (segment != null) { - currentReader.close(); - - currentSegment = segment; - currentReader = currentSegment.createReader(); - } - } - - resetCurrentReader(index); + public void reset(final long index) { + reader.reset(index); } @Override - public T tryNext(final EntryMapper mapper) { - final var index = nextIndex; - var buf = currentReader.readBytes(index); - if (buf == null) { - final var nextSegment = journal.getNextSegment(currentSegment.firstIndex()); - if (nextSegment == null || nextSegment.firstIndex() != index) { - return null; - } - - currentReader.close(); - - currentSegment = nextSegment; - currentReader = currentSegment.createReader(); - buf = currentReader.readBytes(index); - if (buf == null) { - return null; - } - } - - final var entry = journal.serializer().deserialize(buf); - final var ret = requireNonNull(mapper.mapEntry(index, entry, buf.readableBytes())); - nextIndex = index + 1; - return ret; - } - - /** - * Try to move to the next entry. - * - * @return {@code true} if there was a next entry and this reader has moved to it - */ - final boolean tryAdvance() { - return tryNext((index, entry, size) -> ADVANCED) != null; + public @Nullable T tryNext(final EntryMapper entryMapper) { + return reader.tryNext( + (index, buf) -> requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(buf), buf.readableBytes())) + ); } @Override - public final void close() { - currentReader.close(); - journal.closeReader(this); + public void close() { + reader.close(); } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java index 71120891a1..7c331ccb24 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java @@ -1,5 +1,6 @@ /* * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved. + * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,94 +16,53 @@ */ package io.atomix.storage.journal; -import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; /** - * Raft log writer. + * A {@link JournalWriter} backed by a {@link ByteBufWriter}. */ final class SegmentedJournalWriter implements JournalWriter { - private final SegmentedJournal journal; - private JournalSegment currentSegment; - private JournalSegmentWriter currentWriter; + private final ByteBufMapper mapper; + private final ByteBufWriter writer; - SegmentedJournalWriter(SegmentedJournal journal) { - this.journal = journal; - this.currentSegment = journal.getLastSegment(); - this.currentWriter = currentSegment.acquireWriter(); - } - - @Override - public long getLastIndex() { - return currentWriter.getLastIndex(); - } - - @Override - public long getNextIndex() { - return currentWriter.getNextIndex(); - } - - @Override - public void reset(long index) { - if (index > currentSegment.firstIndex()) { - currentSegment.releaseWriter(); - currentSegment = journal.resetSegments(index); - currentWriter = currentSegment.acquireWriter(); - } else { - truncate(index - 1); + SegmentedJournalWriter(final ByteBufWriter writer, final ByteBufMapper mapper) { + this.writer = requireNonNull(writer); + this.mapper = requireNonNull(mapper); } - journal.resetHead(index); - } - @Override - public void commit(long index) { - if (index > journal.getCommitIndex()) { - journal.setCommitIndex(index); - if (journal.isFlushOnCommit()) { - flush(); - } + @Override + public long getLastIndex() { + return writer.lastIndex(); } - } - @Override - public Indexed append(T entry) { - final var bytes = journal.serializer().serialize(entry); - var index = currentWriter.append(bytes); - if (index != null) { - return new Indexed<>(index, entry, bytes.readableBytes()); + @Override + public long getNextIndex() { + return writer.nextIndex(); } - // Slow path: we do not have enough capacity - currentWriter.flush(); - currentSegment.releaseWriter(); - currentSegment = journal.getNextSegment(); - currentWriter = currentSegment.acquireWriter(); - final var newIndex = verifyNotNull(currentWriter.append(bytes)); - return new Indexed<>(newIndex, entry, bytes.readableBytes()); - } - - @Override - public void truncate(long index) { - if (index < journal.getCommitIndex()) { - throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index); + @Override + public void reset(final long index) { + writer.reset(index); } - // Delete all segments with first indexes greater than the given index. - while (index < currentSegment.firstIndex() && currentSegment != journal.getFirstSegment()) { - currentSegment.releaseWriter(); - journal.removeSegment(currentSegment); - currentSegment = journal.getLastSegment(); - currentWriter = currentSegment.acquireWriter(); + @Override + public void commit(final long index) { + writer.commit(index); } - // Truncate the current index. - currentWriter.truncate(index); + @Override + public Indexed append(final T entry) { + final var buf = mapper.objectToBytes(entry); + return new Indexed<>(writer.append(buf), entry, buf.readableBytes()); + } - // Reset segment readers. - journal.resetTail(index + 1); - } + @Override + public void truncate(final long index) { + writer.truncate(index); + } - @Override - public void flush() { - currentWriter.flush(); - } + @Override + public void flush() { + writer.flush(); + } } -- 2.36.6