Separate byte-level atomic-storage access 27/111227/20
authorRuslan Kashapov <ruslan.kashapov@pantheon.tech>
Mon, 22 Apr 2024 14:24:04 +0000 (17:24 +0300)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 7 May 2024 18:46:30 +0000 (20:46 +0200)
Byte level functionality was moved into *ByteJournal*
artifacts and now can be accessed independently.
SegmentedJournal is now acts as a type serialization
layer on top of ByteJournal.

// FIXME: refactor SegmentedJournal.Builder (in a subsequent patch?)

JIRA: CONTROLLER-2115
Change-Id: I2e4941bda3af76f0cd59e8c545131af85c668010
Signed-off-by: Ruslan Kashapov <ruslan.kashapov@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
17 files changed:
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java [moved from atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java with 59% similarity]
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/JournalReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java [deleted file]
atomix-storage/src/main/java/io/atomix/storage/journal/JournalWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java

diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java
new file mode 100644 (file)
index 0000000..baaa6b0
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * A journal of byte arrays. Provides the ability to write modify entries via {@link ByteBufWriter} and read them
+ * back via {@link ByteBufReader}.
+ */
+@NonNullByDefault
+public interface ByteBufJournal extends AutoCloseable {
+    /**
+     * Returns the journal writer.
+     *
+     * @return The journal writer.
+     */
+    ByteBufWriter writer();
+
+    /**
+     * Opens a new {@link ByteBufReader} reading all entries.
+     *
+     * @param index The index at which to start the reader.
+     * @return A new journal reader.
+     */
+    ByteBufReader openReader(long index);
+
+    /**
+     * Opens a new {@link ByteBufReader} reading only committed entries.
+     *
+     * @param index The index at which to start the reader.
+     * @return A new journal reader.
+     */
+    ByteBufReader openCommitsReader(long index);
+
+    @Override
+    void close();
+}
similarity index 59%
rename from atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java
rename to atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java
index 767e67fa4630864fb60fc4fa9d5ae4ce221b5064..cabd48d8bd7d8d428f91ad8d388ca869dd837d2f 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import io.netty.buffer.ByteBuf;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 
 /**
- * A {@link JournalReader} traversing only committed entries.
+ * Support for serialization of {@link ByteBufJournal} entries.
  */
 @NonNullByDefault
-final class CommitsSegmentJournalReader<E> extends SegmentedJournalReader<E> {
-    CommitsSegmentJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
-        super(journal, segment);
-    }
+public interface ByteBufMapper<T> {
+    /**
+     * Converts an object into a series of bytes in a {@link ByteBuf}.
+     *
+     * @param obj the object
+     * @return resulting buffer
+     */
+    ByteBuf objectToBytes(T obj) ;
 
-    @Override
-    public <T> T tryNext(final EntryMapper<E, T> mapper) {
-        return getNextIndex() <= journal.getCommitIndex() ? super.tryNext(mapper) : null;
-    }
+    /**
+     * Converts the contents of a {@link ByteBuf} to an object.
+     *
+     * @param buf buffer to convert
+     * @return resulting object
+     */
+    T bytesToObject(ByteBuf buf);
 }
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java
new file mode 100644 (file)
index 0000000..1ebe81e
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+
+/**
+ * A reader of {@link ByteBufJournal} entries.
+ */
+@NonNullByDefault
+public interface ByteBufReader extends AutoCloseable {
+    /**
+     * A journal entry processor. Responsible for transforming bytes into their internal representation.
+     *
+     * @param <T> Internal representation type
+     */
+    @FunctionalInterface
+    interface EntryMapper<T> {
+        /**
+         * Process an entry.
+         *
+         * @param index entry index
+         * @param bytes entry bytes
+         * @return resulting internal representation
+         */
+        T mapEntry(long index, ByteBuf bytes);
+    }
+
+    /**
+     * Returns the first index in the journal.
+     *
+     * @return The first index in the journal
+     */
+    long firstIndex();
+
+    /**
+     * Returns the next reader index.
+     *
+     * @return The next reader index
+     */
+    long nextIndex();
+
+    /**
+     * Try to move to the next binary data block
+     *
+     * @param entryMapper callback to be invoked on binary data
+     * @return processed binary data, or {@code null}
+     */
+    <T> @Nullable T tryNext(EntryMapper<T> entryMapper);
+
+    /**
+     * Resets the reader to the start.
+     */
+    void reset();
+
+    /**
+     * Resets the reader to the given index.
+     *
+     * @param index The index to which to reset the reader
+     */
+    void reset(long index);
+
+    @Override
+    void close();
+}
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java
new file mode 100644 (file)
index 0000000..7211a88
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * A writer of {@link ByteBufJournal} entries.
+ */
+@NonNullByDefault
+public interface ByteBufWriter {
+    /**
+     * Returns the last written index.
+     *
+     * @return The last written index
+     */
+    long lastIndex();
+
+    /**
+     * Returns the next index to be written.
+     *
+     * @return The next index to be written
+     */
+    long nextIndex();
+
+    /**
+     * Appends an entry to the journal.
+     *
+     * @param bytes Data block to append
+     * @return The index of appended data block
+     */
+    // FIXME: throws IOException
+    long append(ByteBuf bytes);
+
+    /**
+     * Commits entries up to the given index.
+     *
+     * @param index The index up to which to commit entries.
+     */
+    void commit(long index);
+
+    /**
+     * Resets the head of the journal to the given index.
+     *
+     * @param index The index to which to reset the head of the journal
+     */
+    // FIXME: reconcile with reader's reset and truncate()
+    // FIXME: throws IOException
+    void reset(long index);
+
+    /**
+     * Truncates the log to the given index.
+     *
+     * @param index The index to which to truncate the log.
+     */
+    // FIXME: reconcile with reset()
+    // FIXME: throws IOException
+    void truncate(long index);
+
+    /**
+     * Flushes written entries to disk.
+     */
+    // FIXME: throws IOException
+    void flush();
+}
index a3c6ea5366a14691b8c6e653e99dcfcbe223a73c..635f6248c44f5d585e6dd52a118618672a1458ab 100644 (file)
@@ -75,10 +75,10 @@ public interface JournalReader<E> extends AutoCloseable {
     /**
      * Try to move to the next entry.
      *
-     * @param mapper callback to be invoked for the entry
+     * @param entryMapper callback to be invoked for the entry
      * @return processed entry, or {@code null}
      */
-    <T> @Nullable T tryNext(EntryMapper<E, T> mapper);
+    <T> @Nullable T tryNext(EntryMapper<E, T> entryMapper);
 
     /**
      * Resets the reader to the start.
index bba89dfdc96b13921645d02217de9145dc840c96..aa4c0da18a9e59ea5b8cef851c992a63256b6143 100644 (file)
@@ -72,10 +72,9 @@ final class JournalSegmentReader {
     /**
      * Reads the next binary data block
      *
-     * @param index entry index
      * @return The binary data, or {@code null}
      */
-    @Nullable ByteBuf readBytes(final long index) {
+    @Nullable ByteBuf readBytes() {
         // Check if there is enough in the buffer remaining
         final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
         if (remaining < 0) {
@@ -112,7 +111,7 @@ final class JournalSegmentReader {
         position += SegmentEntry.HEADER_BYTES + length;
 
         // rewind and return
-        return Unpooled.buffer(length).writeBytes(entryBuffer.rewind());
+        return Unpooled.wrappedBuffer(entryBuffer.rewind());
     }
 
     /**
index 63f5303ecc99ec7e04d8c7849cd502d77bee3082..dbf6aec214ffe1ef32f6263477272e12e4adc76f 100644 (file)
@@ -144,7 +144,7 @@ final class JournalSegmentWriter {
         reader.setPosition(JournalSegmentDescriptor.BYTES);
 
         while (index == 0 || nextIndex <= index) {
-            final var buf = reader.readBytes(nextIndex);
+            final var buf = reader.readBytes();
             if (buf == null) {
                 break;
             }
index a970882edfa1203830cb36a7b75df867af012433..ffdc9858271ba9d79ec598e5128d7cfea15e1aad 100644 (file)
@@ -19,6 +19,9 @@ package io.atomix.storage.journal;
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import io.atomix.utils.serializer.KryoJournalSerdesBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -27,7 +30,7 @@ import java.nio.ByteBuffer;
 /**
  * Support for serialization of {@link Journal} entries.
  *
- * @deprecated due to dependency on outdated Kryo library, {@link JournalSerializer} to be used instead.
+ * @deprecated due to dependency on outdated Kryo library, {@link ByteBufMapper} to be used instead.
  */
 @Deprecated(forRemoval = true, since="9.0.3")
 public interface JournalSerdes {
@@ -110,6 +113,26 @@ public interface JournalSerdes {
      */
     <T> T deserialize(final InputStream stream, final int bufferSize);
 
+    /**
+     * Returns a {@link ByteBufMapper} backed by this object.
+     *
+     * @return a {@link ByteBufMapper} backed by this object
+     */
+    default <T> ByteBufMapper<T> toMapper() {
+        return new ByteBufMapper<>() {
+            @Override
+            public ByteBuf objectToBytes(final T obj) {
+                return Unpooled.wrappedBuffer(serialize(obj));
+            }
+
+            @Override
+            public T bytesToObject(final ByteBuf buf) {
+                // FIXME: ByteBufUtil creates a copy -- we do not want to do that!
+                return deserialize(ByteBufUtil.getBytes(buf));
+            }
+        };
+    }
+
     /**
      * Creates a new {@link JournalSerdes} builder.
      *
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java
deleted file mode 100644 (file)
index eff9af8..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package io.atomix.storage.journal;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.Unpooled;
-
-/**
- * Support for serialization of {@link Journal} entries.
- */
-public interface JournalSerializer<T> {
-
-    /**
-     * Serializes given object to byte array.
-     *
-     * @param obj Object to serialize
-     * @return serialized bytes as {@link ByteBuf}
-     */
-    ByteBuf serialize(T obj) ;
-
-    /**
-     * Deserializes given byte array to Object.
-     *
-     * @param buf serialized bytes as {@link ByteBuf}
-     * @return deserialized Object
-     */
-    T deserialize(final ByteBuf buf);
-
-    static <E> JournalSerializer<E> wrap(final JournalSerdes serdes) {
-        return new JournalSerializer<>() {
-            @Override
-            public ByteBuf serialize(final E obj) {
-                return Unpooled.wrappedBuffer(serdes.serialize(obj));
-            }
-
-            @Override
-            public E deserialize(final ByteBuf buf) {
-                return serdes.deserialize(ByteBufUtil.getBytes(buf));
-            }
-        };
-    }
-}
index 064fd019ecb6459f245d40bb2711976ab668961a..ba7c5821aa0d014c5328b8903234fa4343dd0e5a 100644 (file)
@@ -57,6 +57,7 @@ public interface JournalWriter<E> {
      *
      * @param index the index to which to reset the head of the journal
      */
+    // FIXME: reconcile with reader's reset and truncate()
     void reset(long index);
 
     /**
@@ -64,6 +65,7 @@ public interface JournalWriter<E> {
      *
      * @param index The index to which to truncate the log.
      */
+    // FIXME: reconcile with reset()
     void truncate(long index);
 
     /**
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java
new file mode 100644 (file)
index 0000000..3ae64ea
--- /dev/null
@@ -0,0 +1,596 @@
+/*
+ * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.BiFunction;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ByteBufJournal} Implementation.
+ */
+public final class SegmentedByteBufJournal implements ByteBufJournal {
+    private static final Logger LOG = LoggerFactory.getLogger(SegmentedByteBufJournal.class);
+    private static final int SEGMENT_BUFFER_FACTOR = 3;
+
+    private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
+    private final Collection<ByteBufReader> readers = ConcurrentHashMap.newKeySet();
+    private final String name;
+    private final StorageLevel storageLevel;
+    private final File directory;
+    private final int maxSegmentSize;
+    private final int maxEntrySize;
+    private final double indexDensity;
+    private final boolean flushOnCommit;
+    private final @NonNull ByteBufWriter writer;
+
+    private JournalSegment currentSegment;
+    private volatile long commitIndex;
+
+    public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
+            final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) {
+        this.name = requireNonNull(name, "name cannot be null");
+        this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
+        this.directory = requireNonNull(directory, "directory cannot be null");
+        this.maxSegmentSize = maxSegmentSize;
+        this.maxEntrySize = maxEntrySize;
+        this.indexDensity = indexDensity;
+        this.flushOnCommit = flushOnCommit;
+        open();
+        writer = new SegmentedByteBufWriter(this);
+    }
+
+    /**
+     * Returns the total size of the journal.
+     *
+     * @return the total size of the journal
+     */
+    public long size() {
+        return segments.values().stream()
+            .mapToLong(segment -> {
+                try {
+                    return segment.file().size();
+                } catch (IOException e) {
+                    throw new StorageException(e);
+                }
+            })
+            .sum();
+    }
+
+    @Override
+    public ByteBufWriter writer() {
+        return writer;
+    }
+
+    @Override
+    public ByteBufReader openReader(final long index) {
+        return openReader(index, SegmentedByteBufReader::new);
+    }
+
+    @NonNullByDefault
+    private ByteBufReader openReader(final long index,
+            final BiFunction<SegmentedByteBufJournal, JournalSegment, ByteBufReader> constructor) {
+        final var reader = constructor.apply(this, segment(index));
+        reader.reset(index);
+        readers.add(reader);
+        return reader;
+    }
+
+    @Override
+    public ByteBufReader openCommitsReader(final long index) {
+        return openReader(index, SegmentedCommitsByteBufReader::new);
+    }
+
+    /**
+     * Opens the segments.
+     */
+    private synchronized void open() {
+        // Load existing log segments from disk.
+        for (var segment : loadSegments()) {
+            segments.put(segment.firstIndex(), segment);
+        }
+        // If a segment doesn't already exist, create an initial segment starting at index 1.
+        if (segments.isEmpty()) {
+            currentSegment = createSegment(1, 1);
+            segments.put(1L, currentSegment);
+        }  else {
+            currentSegment = segments.lastEntry().getValue();
+        }
+    }
+
+    /**
+     * Asserts that the manager is open.
+     *
+     * @throws IllegalStateException if the segment manager is not open
+     */
+    private void assertOpen() {
+        checkState(currentSegment != null, "journal not open");
+    }
+
+    /**
+     * Asserts that enough disk space is available to allocate a new segment.
+     */
+    private void assertDiskSpace() {
+        if (directory.getUsableSpace() < maxSegmentSize * SEGMENT_BUFFER_FACTOR) {
+            throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
+        }
+    }
+
+    /**
+     * Resets the current segment, creating a new segment if necessary.
+     */
+    private synchronized void resetCurrentSegment() {
+        final var lastSegment = lastSegment();
+        if (lastSegment == null) {
+            currentSegment = createSegment(1, 1);
+            segments.put(1L, currentSegment);
+        } else {
+            currentSegment = lastSegment;
+        }
+    }
+
+    /**
+     * Resets and returns the first segment in the journal.
+     *
+     * @param index the starting index of the journal
+     * @return the first segment
+     */
+    JournalSegment resetSegments(final long index) {
+        assertOpen();
+
+        // If the index already equals the first segment index, skip the reset.
+        final var firstSegment = firstSegment();
+        if (index == firstSegment.firstIndex()) {
+            return firstSegment;
+        }
+
+        segments.values().forEach(JournalSegment::delete);
+        segments.clear();
+
+        currentSegment = createSegment(1, index);
+        segments.put(index, currentSegment);
+        return currentSegment;
+    }
+
+    /**
+     * Returns the first segment in the log.
+     *
+     * @throws IllegalStateException if the segment manager is not open
+     */
+    JournalSegment firstSegment() {
+        assertOpen();
+        final var firstEntry = segments.firstEntry();
+        return firstEntry != null ? firstEntry.getValue() : nextSegment();
+    }
+
+    /**
+     * Returns the last segment in the log.
+     *
+     * @throws IllegalStateException if the segment manager is not open
+     */
+    JournalSegment lastSegment() {
+        assertOpen();
+        final var lastEntry = segments.lastEntry();
+        return lastEntry != null ? lastEntry.getValue() : nextSegment();
+    }
+
+    /**
+     * Creates and returns the next segment.
+     *
+     * @return The next segment.
+     * @throws IllegalStateException if the segment manager is not open
+     */
+    synchronized JournalSegment nextSegment() {
+        assertOpen();
+        assertDiskSpace();
+
+        final var index = currentSegment.lastIndex() + 1;
+        final var lastSegment = lastSegment();
+        currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index);
+        segments.put(index, currentSegment);
+        return currentSegment;
+    }
+
+    /**
+     * Returns the segment following the segment with the given ID.
+     *
+     * @param index The segment index with which to look up the next segment.
+     * @return The next segment for the given index.
+     */
+    JournalSegment nextSegment(final long index) {
+        final var higherEntry = segments.higherEntry(index);
+        return higherEntry != null ? higherEntry.getValue() : null;
+    }
+
+    /**
+     * Returns the segment for the given index.
+     *
+     * @param index The index for which to return the segment.
+     * @throws IllegalStateException if the segment manager is not open
+     */
+    synchronized JournalSegment segment(final long index) {
+        assertOpen();
+        // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
+        if (currentSegment != null && index > currentSegment.firstIndex()) {
+            return currentSegment;
+        }
+
+        // If the index is in another segment, get the entry with the next lowest first index.
+        final var segment = segments.floorEntry(index);
+        return segment != null ? segment.getValue() : firstSegment();
+    }
+
+    /**
+     * Removes a segment.
+     *
+     * @param segment The segment to remove.
+     */
+    synchronized void removeSegment(final JournalSegment segment) {
+        segments.remove(segment.firstIndex());
+        segment.delete();
+        resetCurrentSegment();
+    }
+
+    /**
+     * Creates a new segment.
+     */
+    JournalSegment createSegment(final long id, final long index) {
+        final JournalSegmentFile file;
+        try {
+            file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
+                .withId(id)
+                .withIndex(index)
+                .withMaxSegmentSize(maxSegmentSize)
+                // FIXME: propagate maxEntries
+                .withMaxEntries(Integer.MAX_VALUE)
+                .withUpdated(System.currentTimeMillis())
+                .build());
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
+
+        final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
+        LOG.debug("Created segment: {}", segment);
+        return segment;
+    }
+
+    /**
+     * Loads all segments from disk.
+     *
+     * @return A collection of segments for the log.
+     */
+    protected Collection<JournalSegment> loadSegments() {
+        // Ensure log directories are created.
+        directory.mkdirs();
+
+        final var segmentsMap = new TreeMap<Long, JournalSegment>();
+
+        // Iterate through all files in the log directory.
+        for (var file : directory.listFiles(File::isFile)) {
+
+            // If the file looks like a segment file, attempt to load the segment.
+            if (JournalSegmentFile.isSegmentFile(name, file)) {
+                final JournalSegmentFile segmentFile;
+                try {
+                    segmentFile = JournalSegmentFile.openExisting(file.toPath());
+                } catch (IOException e) {
+                    throw new StorageException(e);
+                }
+
+                // Load the segment.
+                LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path());
+
+                // Add the segment to the segments list.
+                final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
+                segments.put(segment.firstIndex(), segment);
+            }
+        }
+
+        // Verify that all the segments in the log align with one another.
+        JournalSegment previousSegment = null;
+        boolean corrupted = false;
+        for (var iterator =  segmentsMap.entrySet().iterator(); iterator.hasNext();  ) {
+            final var segment = iterator.next().getValue();
+            if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
+                LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
+                    previousSegment.file().path());
+                corrupted = true;
+            }
+            if (corrupted) {
+                segment.delete();
+                iterator.remove();
+            }
+            previousSegment = segment;
+        }
+
+        return segmentsMap.values();
+    }
+
+    /**
+     * Resets journal readers to the given head.
+     *
+     * @param index The index at which to reset readers.
+     */
+    void resetHead(final long index) {
+        for (var reader : readers) {
+            if (reader.nextIndex() < index) {
+                reader.reset(index);
+            }
+        }
+    }
+
+    /**
+     * Resets journal readers to the given tail.
+     *
+     * @param index The index at which to reset readers.
+     */
+    void resetTail(final long index) {
+        for (var reader : readers) {
+            if (reader.nextIndex() >= index) {
+                reader.reset(index);
+            }
+        }
+    }
+
+    void closeReader(final SegmentedByteBufReader reader) {
+        readers.remove(reader);
+    }
+
+    /**
+     * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
+     *
+     * @param index the index from which to remove segments
+     * @return indicates whether a segment can be removed from the journal
+     */
+    public boolean isCompactable(final long index) {
+        final var segmentEntry = segments.floorEntry(index);
+        return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
+    }
+
+    /**
+     * Returns the index of the last segment in the log.
+     *
+     * @param index the compaction index
+     * @return the starting index of the last segment in the log
+     */
+    public long getCompactableIndex(final long index) {
+        final var segmentEntry = segments.floorEntry(index);
+        return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
+    }
+
+    /**
+     * Compacts the journal up to the given index.
+     * <p>
+     * The semantics of compaction are not specified by this interface.
+     *
+     * @param index The index up to which to compact the journal.
+     */
+    public void compact(final long index) {
+        final var segmentEntry = segments.floorEntry(index);
+        if (segmentEntry != null) {
+            final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
+            if (!compactSegments.isEmpty()) {
+                LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
+                compactSegments.values().forEach(JournalSegment::delete);
+                compactSegments.clear();
+                resetHead(segmentEntry.getValue().firstIndex());
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if (currentSegment != null) {
+            currentSegment = null;
+            segments.values().forEach(JournalSegment::close);
+            segments.clear();
+        }
+    }
+
+    /**
+     * Returns whether {@code flushOnCommit} is enabled for the log.
+     *
+     * @return Indicates whether {@code flushOnCommit} is enabled for the log.
+     */
+    boolean isFlushOnCommit() {
+        return flushOnCommit;
+    }
+
+    /**
+     * Updates commit index to the given value.
+     *
+     * @param index The index value.
+     */
+    void setCommitIndex(final long index) {
+        commitIndex = index;
+    }
+
+    /**
+     * Returns the journal last commit index.
+     *
+     * @return The journal last commit index.
+     */
+    long getCommitIndex() {
+        return commitIndex;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Segmented byte journal builder.
+     */
+    public static final class Builder {
+        private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
+        private static final String DEFAULT_NAME = "atomix";
+        private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
+        private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
+        private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
+        private static final double DEFAULT_INDEX_DENSITY = .005;
+
+        private String name = DEFAULT_NAME;
+        private StorageLevel storageLevel = StorageLevel.DISK;
+        private File directory = new File(DEFAULT_DIRECTORY);
+        private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
+        private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
+        private double indexDensity = DEFAULT_INDEX_DENSITY;
+        private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
+
+        private Builder() {
+            // on purpose
+        }
+
+        /**
+         * Sets the journal name.
+         *
+         * @param name The journal name.
+         * @return The builder instance
+         */
+        public Builder withName(final String name) {
+            this.name = requireNonNull(name, "name cannot be null");
+            return this;
+        }
+
+        /**
+         * Sets the storage level.
+         *
+         * @param storageLevel The storage level.
+         * @return The builder instance
+         */
+        public Builder withStorageLevel(final StorageLevel storageLevel) {
+            this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
+            return this;
+        }
+
+        /**
+         * Sets the journal directory.
+         *
+         * @param directory The log directory.
+         * @return The builder instance
+         * @throws NullPointerException If the {@code directory} is {@code null}
+         */
+        public Builder withDirectory(final String directory) {
+            return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
+        }
+
+        /**
+         * Sets the journal directory
+         *
+         * @param directory The log directory.
+         * @return The builder instance
+         * @throws NullPointerException If the {@code directory} is {@code null}
+         */
+        public Builder withDirectory(final File directory) {
+            this.directory = requireNonNull(directory, "directory cannot be null");
+            return this;
+        }
+
+        /**
+         * Sets the maximum segment size in bytes.
+         * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
+         *
+         * @param maxSegmentSize The maximum segment size in bytes.
+         * @return The builder instance
+         * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
+         */
+        public Builder withMaxSegmentSize(final int maxSegmentSize) {
+            checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
+                "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
+            this.maxSegmentSize = maxSegmentSize;
+            return this;
+        }
+
+        /**
+         * Sets the maximum entry size in bytes.
+         *
+         * @param maxEntrySize the maximum entry size in bytes
+         * @return the builder instance
+         * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
+         */
+        public Builder withMaxEntrySize(final int maxEntrySize) {
+            checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
+            this.maxEntrySize = maxEntrySize;
+            return this;
+        }
+
+        /**
+         * Sets the journal index density.
+         * <p>
+         * The index density is the frequency at which the position of entries written to the journal will be
+         * recorded in an in-memory index for faster seeking.
+         *
+         * @param indexDensity the index density
+         * @return the builder instance
+         * @throws IllegalArgumentException if the density is not between 0 and 1
+         */
+        public Builder withIndexDensity(final double indexDensity) {
+            checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
+            this.indexDensity = indexDensity;
+            return this;
+        }
+
+        /**
+         * Enables flushing buffers to disk when entries are committed to a segment.
+         * <p>
+         * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
+         * an entry is committed in a given segment.
+         *
+         * @return The builder instance
+         */
+        public Builder withFlushOnCommit() {
+            return withFlushOnCommit(true);
+        }
+
+        /**
+         * Sets whether to flush buffers to disk when entries are committed to a segment.
+         * <p>
+         * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
+         * an entry is committed in a given segment.
+         *
+         * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
+         * @return The builder instance
+         */
+        public Builder withFlushOnCommit(final boolean flushOnCommit) {
+            this.flushOnCommit = flushOnCommit;
+            return this;
+        }
+
+        /**
+         * Build the {@link SegmentedByteBufJournal}.
+         *
+         * @return {@link SegmentedByteBufJournal} instance built.
+         */
+        public SegmentedByteBufJournal build() {
+            return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize,
+                indexDensity, flushOnCommit);
+        }
+    }
+}
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java
new file mode 100644 (file)
index 0000000..d164676
--- /dev/null
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.ByteBuf;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * A {@link ByteBufReader} implementation.
+ */
+sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCommitsByteBufReader {
+    final @NonNull SegmentedByteBufJournal journal;
+
+    private JournalSegment currentSegment;
+    private JournalSegmentReader currentReader;
+    private long nextIndex;
+
+    SegmentedByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) {
+        this.journal = requireNonNull(journal);
+        currentSegment = requireNonNull(segment);
+        currentReader = segment.createReader();
+        nextIndex = currentSegment.firstIndex();
+    }
+
+    @Override
+    public final long firstIndex() {
+        return journal.firstSegment().firstIndex();
+    }
+
+    @Override
+    public final long nextIndex() {
+        return nextIndex;
+    }
+
+    @Override
+    public final void reset() {
+        currentReader.close();
+        currentSegment = journal.firstSegment();
+        currentReader = currentSegment.createReader();
+        nextIndex = currentSegment.firstIndex();
+    }
+
+    @Override
+    public final void reset(final long index) {
+        // If the current segment is not open, it has been replaced. Reset the segments.
+        if (!currentSegment.isOpen()) {
+            reset();
+        }
+        if (index < nextIndex) {
+            rewind(index);
+        } else if (index > nextIndex) {
+            forwardTo(index);
+        } else {
+            resetCurrentReader(index);
+        }
+    }
+
+    private void resetCurrentReader(final long index) {
+        final var position = currentSegment.lookup(index - 1);
+        if (position != null) {
+            nextIndex = position.index();
+            currentReader.setPosition(position.position());
+        } else {
+            nextIndex = currentSegment.firstIndex();
+            currentReader.setPosition(JournalSegmentDescriptor.BYTES);
+        }
+        forwardTo(index);
+    }
+
+    /**
+     * Rewinds the journal to the given index.
+     */
+    private void rewind(final long index) {
+        if (currentSegment.firstIndex() >= index) {
+            final var segment = journal.segment(index - 1);
+            if (segment != null) {
+                currentReader.close();
+                currentSegment = segment;
+                currentReader = currentSegment.createReader();
+            }
+        }
+        resetCurrentReader(index);
+    }
+
+    private void forwardTo(final long index) {
+        while (nextIndex < index && tryAdvance(nextIndex) != null) {
+            // No-op -- nextIndex value is updated in tryAdvance()
+        }
+    }
+
+    @Override
+    public final <T> T tryNext(final EntryMapper<T> entryMapper) {
+        final var index = nextIndex;
+        final var bytes = tryAdvance(index);
+        return bytes == null ? null : entryMapper.mapEntry(index, bytes);
+    }
+
+    /**
+     * Attempt to read the next entry. {@code index} here is really {@code nextIndex} passed by callers, which already
+     * check it for invariants. If non-null is returned, {@code nextIndex} has already been set to {@code index + 1}.
+     *
+     * <p>
+     * This method is shared between 'all entries' and 'committed entries only' variants. The distinction is made by
+     * an additional check in {@link SegmentedCommitsByteBufReader#tryAdvance(long)}.
+     *
+     * @param index next index
+     * @return Entry bytes, or {@code null}
+     */
+    ByteBuf tryAdvance(final long index) {
+        var buf = currentReader.readBytes();
+        if (buf == null) {
+            final var nextSegment = journal.nextSegment(currentSegment.firstIndex());
+            if (nextSegment == null || nextSegment.firstIndex() != index) {
+                return null;
+            }
+            currentReader.close();
+            currentSegment = nextSegment;
+            currentReader = currentSegment.createReader();
+            buf = currentReader.readBytes();
+            if (buf == null) {
+                return null;
+            }
+        }
+        nextIndex = index + 1;
+        return buf;
+    }
+
+    @Override
+    public final void close() {
+        currentReader.close();
+        journal.closeReader(this);
+    }
+}
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java
new file mode 100644 (file)
index 0000000..7e92815
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * A {@link ByteBufWriter} implementation.
+ */
+final class SegmentedByteBufWriter implements ByteBufWriter {
+    private final SegmentedByteBufJournal journal;
+
+    private JournalSegment currentSegment;
+    private JournalSegmentWriter currentWriter;
+
+    SegmentedByteBufWriter(final SegmentedByteBufJournal journal) {
+        this.journal = requireNonNull(journal);
+        currentSegment = journal.lastSegment();
+        currentWriter = currentSegment.acquireWriter();
+    }
+
+    @Override
+    public long lastIndex() {
+        return currentWriter.getLastIndex();
+    }
+
+    @Override
+    public long nextIndex() {
+        return currentWriter.getNextIndex();
+    }
+
+    @Override
+    public void reset(final long index) {
+        if (index > currentSegment.firstIndex()) {
+            currentSegment.releaseWriter();
+            currentSegment = journal.resetSegments(index);
+            currentWriter = currentSegment.acquireWriter();
+        } else {
+            truncate(index - 1);
+        }
+        journal.resetHead(index);
+    }
+
+    @Override
+    public void commit(final long index) {
+        if (index > journal.getCommitIndex()) {
+            journal.setCommitIndex(index);
+            if (journal.isFlushOnCommit()) {
+                flush();
+            }
+        }
+    }
+
+    @Override
+    public long append(final ByteBuf buf) {
+        var index = currentWriter.append(buf);
+        if (index != null) {
+            return index;
+        }
+        //  Slow path: we do not have enough capacity
+        currentWriter.flush();
+        currentSegment.releaseWriter();
+        currentSegment = journal.nextSegment();
+        currentWriter = currentSegment.acquireWriter();
+        return verifyNotNull(currentWriter.append(buf));
+    }
+
+    @Override
+    public void truncate(final long index) {
+        if (index < journal.getCommitIndex()) {
+            throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
+        }
+
+        // Delete all segments with first indexes greater than the given index.
+        while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) {
+            currentSegment.releaseWriter();
+            journal.removeSegment(currentSegment);
+            currentSegment = journal.lastSegment();
+            currentWriter = currentSegment.acquireWriter();
+        }
+
+        // Truncate the current index.
+        currentWriter.truncate(index);
+
+        // Reset segment readers.
+        journal.resetTail(index + 1);
+    }
+
+    @Override
+    public void flush() {
+        currentWriter.flush();
+    }
+}
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java
new file mode 100644 (file)
index 0000000..43fae62
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * A {@link ByteBufReader} traversing only committed entries.
+ */
+final class SegmentedCommitsByteBufReader extends SegmentedByteBufReader {
+    SegmentedCommitsByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) {
+        super(journal, segment);
+    }
+
+    @Override
+    ByteBuf tryAdvance(final long index) {
+        return index <= journal.getCommitIndex() ? super.tryAdvance(index) : null;
+    }
+}
\ No newline at end of file
index 1ae77fa35114ed62678dd14bf1c7926c2800e09e..cd074926925eae80cea3c9ad2e39e85e656169fa 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package io.atomix.storage.journal;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 
 import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Segmented journal.
  */
 public final class SegmentedJournal<E> implements Journal<E> {
-  /**
-   * Returns a new Raft log builder.
-   *
-   * @return A new Raft log builder.
-   */
-  public static <E> Builder<E> builder() {
-    return new Builder<>();
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
-  private static final int SEGMENT_BUFFER_FACTOR = 3;
-
-  private final String name;
-  private final StorageLevel storageLevel;
-  private final File directory;
-  private final JournalSerializer<E> serializer;
-  private final int maxSegmentSize;
-  private final int maxEntrySize;
-  private final int maxEntriesPerSegment;
-  private final double indexDensity;
-  private final boolean flushOnCommit;
-  private final SegmentedJournalWriter<E> writer;
-  private volatile long commitIndex;
-
-  private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
-  private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
-  private JournalSegment currentSegment;
-
-  private volatile boolean open = true;
-
-  public SegmentedJournal(
-      String name,
-      StorageLevel storageLevel,
-      File directory,
-      JournalSerdes namespace,
-      int maxSegmentSize,
-      int maxEntrySize,
-      int maxEntriesPerSegment,
-      double indexDensity,
-      boolean flushOnCommit) {
-    this.name = requireNonNull(name, "name cannot be null");
-    this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
-    this.directory = requireNonNull(directory, "directory cannot be null");
-    this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
-    this.maxSegmentSize = maxSegmentSize;
-    this.maxEntrySize = maxEntrySize;
-    this.maxEntriesPerSegment = maxEntriesPerSegment;
-    this.indexDensity = indexDensity;
-    this.flushOnCommit = flushOnCommit;
-    open();
-    this.writer = new SegmentedJournalWriter<>(this);
-  }
-
-  /**
-   * Returns the segment file name prefix.
-   *
-   * @return The segment file name prefix.
-   */
-  public String name() {
-    return name;
-  }
-
-  /**
-   * Returns the storage directory.
-   * <p>
-   * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
-   * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
-   * when the log is opened.
-   *
-   * @return The storage directory.
-   */
-  public File directory() {
-    return directory;
-  }
-
-  /**
-   * Returns the storage level.
-   * <p>
-   * The storage level dictates how entries within individual journal segments should be stored.
-   *
-   * @return The storage level.
-   */
-  public StorageLevel storageLevel() {
-    return storageLevel;
-  }
-
-  /**
-   * Returns the maximum journal segment size.
-   * <p>
-   * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
-   *
-   * @return The maximum segment size in bytes.
-   */
-  public int maxSegmentSize() {
-    return maxSegmentSize;
-  }
-
-  /**
-   * Returns the maximum journal entry size.
-   * <p>
-   * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
-   *
-   * @return the maximum entry size in bytes
-   */
-  public int maxEntrySize() {
-    return maxEntrySize;
-  }
-
-  /**
-   * Returns the maximum number of entries per segment.
-   * <p>
-   * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
-   * in a journal.
-   *
-   * @return The maximum number of entries per segment.
-   * @deprecated since 3.0.2
-   */
-  @Deprecated
-  public int maxEntriesPerSegment() {
-    return maxEntriesPerSegment;
-  }
-
-  /**
-   * Returns the collection of journal segments.
-   *
-   * @return the collection of journal segments
-   */
-  public Collection<JournalSegment> segments() {
-    return segments.values();
-  }
-
-  /**
-   * Returns the collection of journal segments with indexes greater than the given index.
-   *
-   * @param index the starting index
-   * @return the journal segments starting with indexes greater than or equal to the given index
-   */
-  public Collection<JournalSegment> segments(long index) {
-    return segments.tailMap(index).values();
-  }
-
-  /**
-   * Returns serializer instance.
-   *
-   * @return serializer instance
-   */
-  JournalSerializer<E> serializer() {
-    return serializer;
-  }
-
-  /**
-   * Returns the total size of the journal.
-   *
-   * @return the total size of the journal
-   */
-  public long size() {
-    return segments.values().stream()
-        .mapToLong(segment -> {
-          try {
-            return segment.file().size();
-          } catch (IOException e) {
-            throw new StorageException(e);
-          }
-        })
-        .sum();
-  }
-
-  @Override
-  public JournalWriter<E> writer() {
-    return writer;
-  }
-
-  @Override
-  public JournalReader<E> openReader(long index) {
-    return openReader(index, JournalReader.Mode.ALL);
-  }
-
-  /**
-   * Opens a new Raft log reader with the given reader mode.
-   *
-   * @param index The index from which to begin reading entries.
-   * @param mode The mode in which to read entries.
-   * @return The Raft log reader.
-   */
-  @Override
-  public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
-    final var segment = getSegment(index);
-    final var reader = switch (mode) {
-      case ALL -> new SegmentedJournalReader<>(this, segment);
-      case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
-    };
-
-    // Forward reader to specified index
-    long next = reader.getNextIndex();
-    while (index > next && reader.tryAdvance()) {
-        next = reader.getNextIndex();
+    private final AtomicBoolean open = new AtomicBoolean(true);
+    private final SegmentedByteBufJournal journal;
+    private final SegmentedJournalWriter<E> writer;
+    private final ByteBufMapper<E> mapper;
+
+    public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper<E> mapper) {
+        this.journal = requireNonNull(journal, "journal is required");
+        this.mapper = requireNonNull(mapper, "mapper cannot be null");
+        writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
     }
 
-    readers.add(reader);
-    return reader;
-  }
-
-  /**
-   * Opens the segments.
-   */
-  private synchronized void open() {
-    // Load existing log segments from disk.
-    for (var segment : loadSegments()) {
-      segments.put(segment.firstIndex(), segment);
+    @Override
+    public JournalWriter<E> writer() {
+        return writer;
     }
 
-    // If a segment doesn't already exist, create an initial segment starting at index 1.
-    if (!segments.isEmpty()) {
-      currentSegment = segments.lastEntry().getValue();
-    } else {
-      currentSegment = createSegment(1, 1);
-      segments.put(1L, currentSegment);
-    }
-  }
-
-  /**
-   * Asserts that the manager is open.
-   *
-   * @throws IllegalStateException if the segment manager is not open
-   */
-  private void assertOpen() {
-    checkState(currentSegment != null, "journal not open");
-  }
-
-  /**
-   * Asserts that enough disk space is available to allocate a new segment.
-   */
-  private void assertDiskSpace() {
-    if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
-      throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
-    }
-  }
-
-  /**
-   * Resets the current segment, creating a new segment if necessary.
-   */
-  private synchronized void resetCurrentSegment() {
-    final var lastSegment = getLastSegment();
-    if (lastSegment == null) {
-      currentSegment = createSegment(1, 1);
-      segments.put(1L, currentSegment);
-    } else {
-      currentSegment = lastSegment;
-    }
-  }
-
-  /**
-   * Resets and returns the first segment in the journal.
-   *
-   * @param index the starting index of the journal
-   * @return the first segment
-   */
-  JournalSegment resetSegments(long index) {
-    assertOpen();
-
-    // If the index already equals the first segment index, skip the reset.
-    final var firstSegment = getFirstSegment();
-    if (index == firstSegment.firstIndex()) {
-      return firstSegment;
-    }
-
-    segments.values().forEach(JournalSegment::delete);
-    segments.clear();
-
-    currentSegment = createSegment(1, index);
-    segments.put(index, currentSegment);
-    return currentSegment;
-  }
-
-  /**
-   * Returns the first segment in the log.
-   *
-   * @throws IllegalStateException if the segment manager is not open
-   */
-  JournalSegment getFirstSegment() {
-    assertOpen();
-    Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
-    return segment != null ? segment.getValue() : null;
-  }
-
-  /**
-   * Returns the last segment in the log.
-   *
-   * @throws IllegalStateException if the segment manager is not open
-   */
-  JournalSegment getLastSegment() {
-    assertOpen();
-    Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
-    return segment != null ? segment.getValue() : null;
-  }
-
-  /**
-   * Creates and returns the next segment.
-   *
-   * @return The next segment.
-   * @throws IllegalStateException if the segment manager is not open
-   */
-  synchronized JournalSegment getNextSegment() {
-    assertOpen();
-    assertDiskSpace();
-
-    final var index = currentSegment.lastIndex() + 1;
-    final var lastSegment = getLastSegment();
-    currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index);
-    segments.put(index, currentSegment);
-    return currentSegment;
-  }
-
-  /**
-   * Returns the segment following the segment with the given ID.
-   *
-   * @param index The segment index with which to look up the next segment.
-   * @return The next segment for the given index.
-   */
-  JournalSegment getNextSegment(long index) {
-    Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
-    return nextSegment != null ? nextSegment.getValue() : null;
-  }
-
-  /**
-   * Returns the segment for the given index.
-   *
-   * @param index The index for which to return the segment.
-   * @throws IllegalStateException if the segment manager is not open
-   */
-  synchronized JournalSegment getSegment(long index) {
-    assertOpen();
-    // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
-    if (currentSegment != null && index > currentSegment.firstIndex()) {
-      return currentSegment;
-    }
-
-    // If the index is in another segment, get the entry with the next lowest first index.
-    Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
-    if (segment != null) {
-      return segment.getValue();
-    }
-    return getFirstSegment();
-  }
-
-  /**
-   * Removes a segment.
-   *
-   * @param segment The segment to remove.
-   */
-  synchronized void removeSegment(JournalSegment segment) {
-    segments.remove(segment.firstIndex());
-    segment.delete();
-    resetCurrentSegment();
-  }
-
-  /**
-   * Creates a new segment.
-   */
-  JournalSegment createSegment(long id, long index) {
-    final JournalSegmentFile file;
-    try {
-      file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
-          .withId(id)
-          .withIndex(index)
-          .withMaxSegmentSize(maxSegmentSize)
-          .withMaxEntries(maxEntriesPerSegment)
-          .withUpdated(System.currentTimeMillis())
-          .build());
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
-
-    final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
-    LOG.debug("Created segment: {}", segment);
-    return segment;
-  }
-
-  /**
-   * Loads all segments from disk.
-   *
-   * @return A collection of segments for the log.
-   */
-  protected Collection<JournalSegment> loadSegments() {
-    // Ensure log directories are created.
-    directory.mkdirs();
-
-    final var segments = new TreeMap<Long, JournalSegment>();
-
-    // Iterate through all files in the log directory.
-    for (var file : directory.listFiles(File::isFile)) {
-
-      // If the file looks like a segment file, attempt to load the segment.
-      if (JournalSegmentFile.isSegmentFile(name, file)) {
-        final JournalSegmentFile segmentFile;
-        try {
-          segmentFile = JournalSegmentFile.openExisting(file.toPath());
-        } catch (IOException e) {
-          throw new StorageException(e);
-        }
-
-        // Load the segment.
-        LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path());
-
-        // Add the segment to the segments list.
-        final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
-        segments.put(segment.firstIndex(), segment);
-      }
-    }
-
-    // Verify that all the segments in the log align with one another.
-    JournalSegment previousSegment = null;
-    boolean corrupted = false;
-    final var iterator = segments.entrySet().iterator();
-    while (iterator.hasNext()) {
-      final var segment = iterator.next().getValue();
-      if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
-        LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
-            previousSegment.file().path());
-        corrupted = true;
-      }
-      if (corrupted) {
-        segment.delete();
-        iterator.remove();
-      }
-      previousSegment = segment;
-    }
-
-    return segments.values();
-  }
-
-  /**
-   * Resets journal readers to the given head.
-   *
-   * @param index The index at which to reset readers.
-   */
-  void resetHead(long index) {
-    for (var reader : readers) {
-      if (reader.getNextIndex() < index) {
-        reader.reset(index);
-      }
-    }
-  }
-
-  /**
-   * Resets journal readers to the given tail.
-   *
-   * @param index The index at which to reset readers.
-   */
-  void resetTail(long index) {
-    for (var reader : readers) {
-      if (reader.getNextIndex() >= index) {
-        reader.reset(index);
-      }
-    }
-  }
-
-  void closeReader(SegmentedJournalReader<E> reader) {
-    readers.remove(reader);
-  }
-
-  @Override
-  public boolean isOpen() {
-    return open;
-  }
-
-  /**
-   * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
-   *
-   * @param index the index from which to remove segments
-   * @return indicates whether a segment can be removed from the journal
-   */
-  public boolean isCompactable(long index) {
-    Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
-    return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
-  }
-
-  /**
-   * Returns the index of the last segment in the log.
-   *
-   * @param index the compaction index
-   * @return the starting index of the last segment in the log
-   */
-  public long getCompactableIndex(long index) {
-    Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
-    return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
-  }
-
-  /**
-   * Compacts the journal up to the given index.
-   * <p>
-   * The semantics of compaction are not specified by this interface.
-   *
-   * @param index The index up to which to compact the journal.
-   */
-  public void compact(long index) {
-    final var segmentEntry = segments.floorEntry(index);
-    if (segmentEntry != null) {
-      final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
-      if (!compactSegments.isEmpty()) {
-        LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
-        compactSegments.values().forEach(JournalSegment::delete);
-        compactSegments.clear();
-        resetHead(segmentEntry.getValue().firstIndex());
-      }
-    }
-  }
-
-  @Override
-  public void close() {
-    segments.values().forEach(JournalSegment::close);
-    currentSegment = null;
-    open = false;
-  }
-
-  /**
-   * Returns whether {@code flushOnCommit} is enabled for the log.
-   *
-   * @return Indicates whether {@code flushOnCommit} is enabled for the log.
-   */
-  boolean isFlushOnCommit() {
-    return flushOnCommit;
-  }
-
-  /**
-   * Commits entries up to the given index.
-   *
-   * @param index The index up to which to commit entries.
-   */
-  void setCommitIndex(long index) {
-    this.commitIndex = index;
-  }
-
-  /**
-   * Returns the Raft log commit index.
-   *
-   * @return The Raft log commit index.
-   */
-  long getCommitIndex() {
-    return commitIndex;
-  }
-
-  /**
-   * Raft log builder.
-   */
-  public static final class Builder<E> {
-    private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
-    private static final String DEFAULT_NAME = "atomix";
-    private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
-    private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
-    private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
-    private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
-    private static final double DEFAULT_INDEX_DENSITY = .005;
-
-    private String name = DEFAULT_NAME;
-    private StorageLevel storageLevel = StorageLevel.DISK;
-    private File directory = new File(DEFAULT_DIRECTORY);
-    private JournalSerdes namespace;
-    private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
-    private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
-    private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
-    private double indexDensity = DEFAULT_INDEX_DENSITY;
-    private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
-
-    Builder() {
-      // Hidden on purpose
+    @Override
+    public JournalReader<E> openReader(final long index) {
+        return openReader(index, JournalReader.Mode.ALL);
     }
 
     /**
-     * Sets the storage name.
+     * Opens a new journal reader with the given reader mode.
      *
-     * @param name The storage name.
-     * @return The storage builder.
+     * @param index The index from which to begin reading entries.
+     * @param mode The mode in which to read entries.
+     * @return The journal reader.
      */
-    public Builder<E> withName(String name) {
-      this.name = requireNonNull(name, "name cannot be null");
-      return this;
+    @Override
+    public JournalReader<E> openReader(final long index, final JournalReader.Mode mode) {
+        final var byteReader = switch (mode) {
+            case ALL -> journal.openReader(index);
+            case COMMITS -> journal.openCommitsReader(index);
+        };
+        return new SegmentedJournalReader<>(byteReader, mapper);
     }
 
-    /**
-     * Sets the log storage level, returning the builder for method chaining.
-     * <p>
-     * The storage level indicates how individual entries should be persisted in the journal.
-     *
-     * @param storageLevel The log storage level.
-     * @return The storage builder.
-     */
-    public Builder<E> withStorageLevel(StorageLevel storageLevel) {
-      this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
-      return this;
+    @Override
+    public boolean isOpen() {
+        return open.get();
     }
 
-    /**
-     * Sets the log directory, returning the builder for method chaining.
-     * <p>
-     * The log will write segment files into the provided directory.
-     *
-     * @param directory The log directory.
-     * @return The storage builder.
-     * @throws NullPointerException If the {@code directory} is {@code null}
-     */
-    public Builder<E> withDirectory(String directory) {
-      return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
+    @Override
+    public void close() {
+        if (open.compareAndExchange(true, false)) {
+            journal.close();
+        }
     }
 
     /**
-     * Sets the log directory, returning the builder for method chaining.
+     * Compacts the journal up to the given index.
      * <p>
-     * The log will write segment files into the provided directory.
+     * The semantics of compaction are not specified by this interface.
      *
-     * @param directory The log directory.
-     * @return The storage builder.
-     * @throws NullPointerException If the {@code directory} is {@code null}
+     * @param index The index up to which to compact the journal.
      */
-    public Builder<E> withDirectory(File directory) {
-      this.directory = requireNonNull(directory, "directory cannot be null");
-      return this;
+    public void compact(final long index) {
+        journal.compact(index);
     }
 
     /**
-     * Sets the journal namespace, returning the builder for method chaining.
+     * Returns a new segmented journal builder.
      *
-     * @param namespace The journal serializer.
-     * @return The journal builder.
+     * @return A new segmented journal builder.
      */
-    public Builder<E> withNamespace(JournalSerdes namespace) {
-      this.namespace = requireNonNull(namespace, "namespace cannot be null");
-      return this;
+    public static <E> Builder<E> builder() {
+        return new Builder<>();
     }
 
-    /**
-     * Sets the maximum segment size in bytes, returning the builder for method chaining.
-     * <p>
-     * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
-     * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
-     * segment and append new entries to that segment.
-     * <p>
-     * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
-     *
-     * @param maxSegmentSize The maximum segment size in bytes.
-     * @return The storage builder.
-     * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
-     */
-    public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
-      checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
-          "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
-      this.maxSegmentSize = maxSegmentSize;
-      return this;
-    }
+    public static final class Builder<E> {
+        private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder();
+        private ByteBufMapper<E> mapper;
 
-    /**
-     * Sets the maximum entry size in bytes, returning the builder for method chaining.
-     *
-     * @param maxEntrySize the maximum entry size in bytes
-     * @return the storage builder
-     * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
-     */
-    public Builder<E> withMaxEntrySize(int maxEntrySize) {
-      checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
-      this.maxEntrySize = maxEntrySize;
-      return this;
-    }
+        private Builder() {
+            // on purpose
+        }
 
-    /**
-     * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
-     * <p>
-     * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
-     * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
-     * new segment and append new entries to that segment.
-     * <p>
-     * By default, the maximum entries per segment is {@code 1024 * 1024}.
-     *
-     * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
-     * @return The storage builder.
-     * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
-     *     per segment
-     * @deprecated since 3.0.2
-     */
-    @Deprecated
-    public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
-      checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
-      checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
-          "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
-      this.maxEntriesPerSegment = maxEntriesPerSegment;
-      return this;
-    }
+        /**
+         * Sets the journal name.
+         *
+         * @param name The journal name.
+         * @return The journal builder.
+         */
+        public Builder<E> withName(final String name) {
+            byteJournalBuilder.withName(name);
+            return this;
+        }
 
-    /**
-     * Sets the journal index density.
-     * <p>
-     * The index density is the frequency at which the position of entries written to the journal will be recorded in an
-     * in-memory index for faster seeking.
-     *
-     * @param indexDensity the index density
-     * @return the journal builder
-     * @throws IllegalArgumentException if the density is not between 0 and 1
-     */
-    public Builder<E> withIndexDensity(double indexDensity) {
-      checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
-      this.indexDensity = indexDensity;
-      return this;
-    }
+        /**
+         * Sets the journal storage level.
+         * <p>
+         * The storage level indicates how individual entries will be persisted in the journal.
+         *
+         * @param storageLevel The log storage level.
+         * @return The journal builder.
+         */
+        public Builder<E> withStorageLevel(final StorageLevel storageLevel) {
+            byteJournalBuilder.withStorageLevel(storageLevel);
+            return this;
+        }
 
-    /**
-     * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
-     * chaining.
-     * <p>
-     * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
-     * committed in a given segment.
-     *
-     * @return The storage builder.
-     */
-    public Builder<E> withFlushOnCommit() {
-      return withFlushOnCommit(true);
-    }
+        /**
+         * Sets the journal storage directory.
+         * <p>
+         * The journal will write segment files into the provided directory.
+         *
+         * @param directory The journal storage directory.
+         * @return The journal builder.
+         * @throws NullPointerException If the {@code directory} is {@code null}
+         */
+        public Builder<E> withDirectory(final String directory) {
+            byteJournalBuilder.withDirectory(directory);
+            return this;
+        }
 
-    /**
-     * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
-     * chaining.
-     * <p>
-     * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
-     * committed in a given segment.
-     *
-     * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
-     * @return The storage builder.
-     */
-    public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
-      this.flushOnCommit = flushOnCommit;
-      return this;
-    }
+        /**
+         * Sets the journal storage directory.
+         * <p>
+         * The journal will write segment files into the provided directory.
+         *
+         * @param directory The journal storage directory.
+         * @return The journal builder.
+         * @throws NullPointerException If the {@code directory} is {@code null}
+         */
+        public Builder<E> withDirectory(final File directory) {
+             byteJournalBuilder.withDirectory(directory);
+            return this;
+        }
 
-    /**
-     * Build the {@link SegmentedJournal}.
-     *
-     * @return A new {@link SegmentedJournal}.
-     */
-    public SegmentedJournal<E> build() {
-      return new SegmentedJournal<>(
-          name,
-          storageLevel,
-          directory,
-          namespace,
-          maxSegmentSize,
-          maxEntrySize,
-          maxEntriesPerSegment,
-          indexDensity,
-          flushOnCommit);
+        /**
+         * Sets the journal namespace.
+         *
+         * @param namespace The journal serializer.
+         * @return The journal builder.
+         * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead
+         */
+        @Deprecated(forRemoval = true, since="9.0.3")
+        public Builder<E> withNamespace(final JournalSerdes namespace) {
+            return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper());
+        }
+
+        /**
+         * Sets journal serializer.
+         *
+         * @param mapper Journal serializer
+         * @return The journal builder
+         */
+        public Builder<E> withMapper(final ByteBufMapper<E> mapper) {
+            this.mapper = requireNonNull(mapper);
+            return this;
+        }
+
+        /**
+         * Sets the maximum segment size in bytes.
+         * <p>
+         * The maximum segment size dictates when journal should roll over to new segments. As entries are written
+         * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the
+         * journal will create a new segment and append new entries to that segment.
+         * <p>
+         * By default, the maximum segment size is 32M.
+         *
+         * @param maxSegmentSize The maximum segment size in bytes.
+         * @return The storage builder.
+         * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
+         */
+        public Builder<E> withMaxSegmentSize(final int maxSegmentSize) {
+            byteJournalBuilder.withMaxSegmentSize(maxSegmentSize);
+            return this;
+        }
+
+        /**
+         * Sets the maximum entry size in bytes.
+         *
+         * @param maxEntrySize the maximum entry size in bytes
+         * @return the storage builder
+         * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
+         */
+        public Builder<E> withMaxEntrySize(final int maxEntrySize) {
+            byteJournalBuilder.withMaxEntrySize(maxEntrySize);
+            return this;
+        }
+
+        /**
+         * Sets the maximum number of entries per segment.
+         *
+         * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
+         * @return The journal builder.
+         * @deprecated since 3.0.2, no longer used
+         */
+        @Deprecated
+        public Builder<E> withMaxEntriesPerSegment(final int maxEntriesPerSegment) {
+            // ignore
+            return this;
+        }
+
+        /**
+         * Sets the journal index density.
+         * <p>
+         * The index density is the frequency at which the position of entries written to the journal will be recorded
+         * in an in-memory index for faster seeking.
+         *
+         * @param indexDensity the index density
+         * @return the journal builder
+         * @throws IllegalArgumentException if the density is not between 0 and 1
+         */
+        public Builder<E> withIndexDensity(final double indexDensity) {
+            byteJournalBuilder.withIndexDensity(indexDensity);
+            return this;
+        }
+
+        /**
+         * Enables flushing buffers to disk when entries are committed to a segment.
+         * <p>
+         * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
+         * entry is committed in a given segment.
+         *
+         * @return The journal builder.
+         */
+        public Builder<E> withFlushOnCommit() {
+            return withFlushOnCommit(true);
+        }
+
+        /**
+         * Enables flushing buffers to disk when entries are committed to a segment.
+         * <p>
+         * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
+         * entry is committed in a given segment.
+         *
+         * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
+         * @return The journal builder.
+         */
+        public Builder<E> withFlushOnCommit(final boolean flushOnCommit) {
+            byteJournalBuilder.withFlushOnCommit(flushOnCommit);
+            return this;
+        }
+
+        /**
+         * Build the {@link SegmentedJournal}.
+         *
+         * @return {@link SegmentedJournal} instance.
+         */
+        public SegmentedJournal<E> build() {
+            return new SegmentedJournal<>(byteJournalBuilder.build(), mapper);
+        }
     }
-  }
 }
index a5deb6382ec9bf372746e77a82820333ff26fbd3..f28390c84b9f592d4098996124eb8b2e98ca0e5e 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
- * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,134 +18,49 @@ package io.atomix.storage.journal;
 
 import static java.util.Objects.requireNonNull;
 
-import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 
 /**
- * A {@link JournalReader} traversing all entries.
+ * A {@link JournalReader} backed by a {@link ByteBufReader}.
  */
-sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
-    // Marker non-null object for tryAdvance()
-    private static final @NonNull Object ADVANCED = new Object();
+final class SegmentedJournalReader<E> implements JournalReader<E> {
+    private final ByteBufMapper<E> mapper;
+    private final ByteBufReader reader;
 
-    final SegmentedJournal<E> journal;
-
-    private JournalSegment currentSegment;
-    private JournalSegmentReader currentReader;
-    private long nextIndex;
-
-    SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
-        this.journal = requireNonNull(journal);
-        currentSegment = requireNonNull(segment);
-        currentReader = segment.createReader();
-        nextIndex = currentSegment.firstIndex();
+    SegmentedJournalReader(final ByteBufReader reader, final ByteBufMapper<E> mapper) {
+        this.reader = requireNonNull(reader);
+        this.mapper = requireNonNull(mapper);
     }
 
     @Override
-    public final long getFirstIndex() {
-        return journal.getFirstSegment().firstIndex();
+    public long getFirstIndex() {
+        return reader.firstIndex();
     }
 
     @Override
-    public final long getNextIndex() {
-        return nextIndex;
+    public long getNextIndex() {
+        return reader.nextIndex();
     }
 
     @Override
-    public final void reset() {
-        currentReader.close();
-
-        currentSegment = journal.getFirstSegment();
-        currentReader = currentSegment.createReader();
-        nextIndex = currentSegment.firstIndex();
+    public void reset() {
+        reader.reset();
     }
 
     @Override
-    public final void reset(final long index) {
-        // If the current segment is not open, it has been replaced. Reset the segments.
-        if (!currentSegment.isOpen()) {
-            reset();
-        }
-
-        if (index < nextIndex) {
-            rewind(index);
-        } else if (index > nextIndex) {
-            while (index > nextIndex && tryAdvance()) {
-                // Nothing else
-            }
-        } else {
-            resetCurrentReader(index);
-        }
-    }
-
-    private void resetCurrentReader(final long index) {
-        final var position = currentSegment.lookup(index - 1);
-        if (position != null) {
-            nextIndex = position.index();
-            currentReader.setPosition(position.position());
-        } else {
-            nextIndex = currentSegment.firstIndex();
-            currentReader.setPosition(JournalSegmentDescriptor.BYTES);
-        }
-        while (nextIndex < index && tryAdvance()) {
-            // Nothing else
-        }
-    }
-
-    /**
-     * Rewinds the journal to the given index.
-     */
-    private void rewind(final long index) {
-        if (currentSegment.firstIndex() >= index) {
-            JournalSegment segment = journal.getSegment(index - 1);
-            if (segment != null) {
-                currentReader.close();
-
-                currentSegment = segment;
-                currentReader = currentSegment.createReader();
-            }
-        }
-
-        resetCurrentReader(index);
+    public void reset(final long index) {
+        reader.reset(index);
     }
 
     @Override
-    public <T> T tryNext(final EntryMapper<E, T> mapper) {
-        final var index = nextIndex;
-        var buf = currentReader.readBytes(index);
-        if (buf == null) {
-            final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
-            if (nextSegment == null || nextSegment.firstIndex() != index) {
-                return null;
-            }
-
-            currentReader.close();
-
-            currentSegment = nextSegment;
-            currentReader = currentSegment.createReader();
-            buf = currentReader.readBytes(index);
-            if (buf == null) {
-                return null;
-            }
-        }
-
-        final var entry = journal.serializer().deserialize(buf);
-        final var ret = requireNonNull(mapper.mapEntry(index, entry, buf.readableBytes()));
-        nextIndex = index + 1;
-        return ret;
-    }
-
-    /**
-     * Try to move to the next entry.
-     *
-     * @return {@code true} if there was a next entry and this reader has moved to it
-     */
-    final boolean tryAdvance() {
-        return tryNext((index, entry, size) -> ADVANCED) != null;
+    public <T> @Nullable T tryNext(final EntryMapper<E, T> entryMapper) {
+        return reader.tryNext(
+            (index, buf) -> requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(buf), buf.readableBytes()))
+        );
     }
 
     @Override
-    public final void close() {
-        currentReader.close();
-        journal.closeReader(this);
+    public void close() {
+        reader.close();
     }
 }
index 71120891a1514847ed44789c10e1f5b95692d0de..7c331ccb2463d5bf595a23ae319712cd4a042d38 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package io.atomix.storage.journal;
 
-import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
 
 /**
- * Raft log writer.
+ * A {@link JournalWriter} backed by a {@link ByteBufWriter}.
  */
 final class SegmentedJournalWriter<E> implements JournalWriter<E> {
-  private final SegmentedJournal<E> journal;
-  private JournalSegment currentSegment;
-  private JournalSegmentWriter currentWriter;
+    private final ByteBufMapper<E> mapper;
+    private final ByteBufWriter writer;
 
-  SegmentedJournalWriter(SegmentedJournal<E> journal) {
-    this.journal = journal;
-    this.currentSegment = journal.getLastSegment();
-    this.currentWriter = currentSegment.acquireWriter();
-  }
-
-  @Override
-  public long getLastIndex() {
-    return currentWriter.getLastIndex();
-  }
-
-  @Override
-  public long getNextIndex() {
-    return currentWriter.getNextIndex();
-  }
-
-  @Override
-  public void reset(long index) {
-    if (index > currentSegment.firstIndex()) {
-      currentSegment.releaseWriter();
-      currentSegment = journal.resetSegments(index);
-      currentWriter = currentSegment.acquireWriter();
-    } else {
-      truncate(index - 1);
+    SegmentedJournalWriter(final ByteBufWriter writer, final ByteBufMapper<E> mapper) {
+        this.writer = requireNonNull(writer);
+        this.mapper = requireNonNull(mapper);
     }
-    journal.resetHead(index);
-  }
 
-  @Override
-  public void commit(long index) {
-    if (index > journal.getCommitIndex()) {
-      journal.setCommitIndex(index);
-      if (journal.isFlushOnCommit()) {
-        flush();
-      }
+    @Override
+    public long getLastIndex() {
+        return writer.lastIndex();
     }
-  }
 
-  @Override
-  public <T extends E> Indexed<T> append(T entry) {
-    final var bytes = journal.serializer().serialize(entry);
-    var index = currentWriter.append(bytes);
-    if (index != null) {
-      return new Indexed<>(index, entry, bytes.readableBytes());
+    @Override
+    public long getNextIndex() {
+        return writer.nextIndex();
     }
 
-    //  Slow path: we do not have enough capacity
-    currentWriter.flush();
-    currentSegment.releaseWriter();
-    currentSegment = journal.getNextSegment();
-    currentWriter = currentSegment.acquireWriter();
-    final var newIndex = verifyNotNull(currentWriter.append(bytes));
-    return new Indexed<>(newIndex, entry, bytes.readableBytes());
-  }
-
-  @Override
-  public void truncate(long index) {
-    if (index < journal.getCommitIndex()) {
-      throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
+    @Override
+    public void reset(final long index) {
+        writer.reset(index);
     }
 
-    // Delete all segments with first indexes greater than the given index.
-    while (index < currentSegment.firstIndex() && currentSegment != journal.getFirstSegment()) {
-      currentSegment.releaseWriter();
-      journal.removeSegment(currentSegment);
-      currentSegment = journal.getLastSegment();
-      currentWriter = currentSegment.acquireWriter();
+    @Override
+    public void commit(final long index) {
+        writer.commit(index);
     }
 
-    // Truncate the current index.
-    currentWriter.truncate(index);
+    @Override
+    public <T extends E> Indexed<T> append(final T entry) {
+        final var buf = mapper.objectToBytes(entry);
+        return new Indexed<>(writer.append(buf), entry, buf.readableBytes());
+    }
 
-    // Reset segment readers.
-    journal.resetTail(index + 1);
-  }
+    @Override
+    public void truncate(final long index) {
+        writer.truncate(index);
+    }
 
-  @Override
-  public void flush() {
-    currentWriter.flush();
-  }
+    @Override
+    public void flush() {
+        writer.flush();
+    }
 }