Clean up Segmented(ByteBuf)Journal 67/111667/3
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 8 May 2024 21:48:14 +0000 (23:48 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 9 May 2024 00:20:03 +0000 (00:20 +0000)
Remove SegmentedJournal.Builder(), as it really has no point. Also
expose the compact() method and restore maxEntriesPerSegment.

JIRA: CONTROLLER-2115
Change-Id: I7213cdca037d45ad9f15f2e577bd8cb1e6b75156
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java
atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java

index dc3e75bcde0888fce6e8675696550c0c3ccd7a91..cae71fffcbfbbceefcb13bdb712502db6a645a28 100644 (file)
@@ -53,6 +53,16 @@ public interface ByteBufJournal extends AutoCloseable {
      */
     ByteBufReader openCommitsReader(long index);
 
+    /**
+     * Compacts the journal up to the given index.
+     *
+     * <p>
+     * The semantics of compaction are not specified by this interface.
+     *
+     * @param index The index up to which to compact the journal.
+     */
+    void compact(long index);
+
     @Override
     void close();
 }
index 39be7d4d5f7338019f4d41b84ec8e9dd962fae0d..b80324e9a1fc1efde276219e28b271e64f20a4e0 100644 (file)
@@ -56,6 +56,16 @@ public interface Journal<E> extends AutoCloseable {
      */
     JournalReader<E> openReader(long index, Mode mode);
 
+    /**
+     * Compacts the journal up to the given index.
+     *
+     * <p>
+     * The semantics of compaction are not specified by this interface.
+     *
+     * @param index The index up to which to compact the journal.
+     */
+    void compact(long index);
+
     @Override
     void close();
 }
index 0d42181851f5c4d7e587e64180e65ba97f35baed..2afc434f33d78345a5995cdb3123bd99ec85395f 100644 (file)
@@ -48,6 +48,8 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
     private final File directory;
     private final int maxSegmentSize;
     private final int maxEntrySize;
+    @Deprecated(forRemoval = true)
+    private final int maxEntriesPerSegment;
     private final double indexDensity;
     private final boolean flushOnCommit;
     private final @NonNull ByteBufWriter writer;
@@ -56,13 +58,15 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
     private JournalSegment currentSegment;
     private volatile long commitIndex;
 
-    public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
-            final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) {
+    SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
+            final int maxSegmentSize, final int maxEntrySize, final int maxEntriesPerSegment, final double indexDensity,
+            final boolean flushOnCommit) {
         this.name = requireNonNull(name, "name cannot be null");
         this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
         this.directory = requireNonNull(directory, "directory cannot be null");
         this.maxSegmentSize = maxSegmentSize;
         this.maxEntrySize = maxEntrySize;
+        this.maxEntriesPerSegment = maxEntriesPerSegment;
         this.indexDensity = indexDensity;
         this.flushOnCommit = flushOnCommit;
 
@@ -256,8 +260,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
                 .withId(segmentId)
                 .withIndex(firstIndex)
                 .withMaxSegmentSize(maxSegmentSize)
-                // FIXME: propagate maxEntries
-                .withMaxEntries(Integer.MAX_VALUE)
+                .withMaxEntries(maxEntriesPerSegment)
                 .withUpdated(System.currentTimeMillis())
                 .build());
         } catch (IOException e) {
@@ -390,14 +393,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
         return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
     }
 
-    /**
-     * Compacts the journal up to the given index.
-     *
-     * <p>
-     * The semantics of compaction are not specified by this interface.
-     *
-     * @param index The index up to which to compact the journal.
-     */
+    @Override
     public void compact(final long index) {
         final var firstIndex = getCompactableIndex(index);
         if (firstIndex != 0) {
@@ -460,6 +456,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
         private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
         private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
         private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
+        private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
         private static final double DEFAULT_INDEX_DENSITY = .005;
 
         private String name = DEFAULT_NAME;
@@ -467,6 +464,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
         private File directory = new File(DEFAULT_DIRECTORY);
         private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
         private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
+        private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
         private double indexDensity = DEFAULT_INDEX_DENSITY;
         private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
 
@@ -547,6 +545,32 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
             return this;
         }
 
+        /**
+         * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
+         *
+         * <p>
+         * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a
+         * segment of the log, if the entry count in that segment meets the configured maximum entry count, the log will
+         * create a new segment and append new entries to that segment.
+         *
+         * <p>
+         * By default, the maximum entries per segment is {@code 1024 * 1024}.
+         *
+         * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
+         * @return The storage builder.
+         * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
+         *     per segment
+         * @deprecated This option has no effect and is scheduled for removal.
+         */
+        @Deprecated(forRemoval = true, since = "9.0.3")
+        public Builder withMaxEntriesPerSegment(final int maxEntriesPerSegment) {
+            checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
+            checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
+                "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
+            this.maxEntriesPerSegment = maxEntriesPerSegment;
+            return this;
+        }
+
         /**
          * Sets the journal index density.
          *
@@ -599,7 +623,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal {
          */
         public SegmentedByteBufJournal build() {
             return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize,
-                indexDensity, flushOnCommit);
+                maxEntriesPerSegment, indexDensity, flushOnCommit);
         }
     }
 }
index 43fae62a1e48b6a9299929c16ea4b240093e2b92..135549a9a197f67953c89fa79a7b14f4d6846a30 100644 (file)
@@ -1,9 +1,17 @@
 /*
  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
  *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package io.atomix.storage.journal;
 
index 35b0cb87bc47dd6f58f127bd07155d1c3a33c27d..11970ba083ce6d3521ac7616c1d4e154adf489c7 100644 (file)
@@ -18,17 +18,17 @@ package io.atomix.storage.journal;
 
 import static java.util.Objects.requireNonNull;
 
-import java.io.File;
+import com.google.common.base.MoreObjects;
 
 /**
- * Segmented journal.
+ * A {@link Journal} implementation based on a {@link ByteBufJournal}.
  */
 public final class SegmentedJournal<E> implements Journal<E> {
-    private final SegmentedByteBufJournal journal;
     private final SegmentedJournalWriter<E> writer;
     private final ByteBufMapper<E> mapper;
+    private final ByteBufJournal journal;
 
-    public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper<E> mapper) {
+    public SegmentedJournal(final ByteBufJournal journal, final ByteBufMapper<E> mapper) {
         this.journal = requireNonNull(journal, "journal is required");
         this.mapper = requireNonNull(mapper, "mapper cannot be null");
         writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
@@ -66,213 +66,17 @@ public final class SegmentedJournal<E> implements Journal<E> {
     }
 
     @Override
-    public void close() {
-        journal.close();
-    }
-
-    /**
-     * Compacts the journal up to the given index.
-     *
-     * <p>
-     * The semantics of compaction are not specified by this interface.
-     *
-     * @param index The index up to which to compact the journal.
-     */
     public void compact(final long index) {
         journal.compact(index);
     }
 
-    /**
-     * Returns a new segmented journal builder.
-     *
-     * @return A new segmented journal builder.
-     */
-    public static <E> Builder<E> builder() {
-        return new Builder<>();
+    @Override
+    public void close() {
+        journal.close();
     }
 
-    public static final class Builder<E> {
-        private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder();
-        private ByteBufMapper<E> mapper;
-
-        private Builder() {
-            // on purpose
-        }
-
-        /**
-         * Sets the journal name.
-         *
-         * @param name The journal name.
-         * @return The journal builder.
-         */
-        public Builder<E> withName(final String name) {
-            byteJournalBuilder.withName(name);
-            return this;
-        }
-
-        /**
-         * Sets the journal storage level.
-         *
-         * <p>
-         * The storage level indicates how individual entries will be persisted in the journal.
-         *
-         * @param storageLevel The log storage level.
-         * @return The journal builder.
-         */
-        public Builder<E> withStorageLevel(final StorageLevel storageLevel) {
-            byteJournalBuilder.withStorageLevel(storageLevel);
-            return this;
-        }
-
-        /**
-         * Sets the journal storage directory.
-         *
-         * <p>
-         * The journal will write segment files into the provided directory.
-         *
-         * @param directory The journal storage directory.
-         * @return The journal builder.
-         * @throws NullPointerException If the {@code directory} is {@code null}
-         */
-        public Builder<E> withDirectory(final String directory) {
-            byteJournalBuilder.withDirectory(directory);
-            return this;
-        }
-
-        /**
-         * Sets the journal storage directory.
-         *
-         * <p>
-         * The journal will write segment files into the provided directory.
-         *
-         * @param directory The journal storage directory.
-         * @return The journal builder.
-         * @throws NullPointerException If the {@code directory} is {@code null}
-         */
-        public Builder<E> withDirectory(final File directory) {
-            byteJournalBuilder.withDirectory(directory);
-            return this;
-        }
-
-        /**
-         * Sets the journal namespace.
-         *
-         * @param namespace The journal serializer.
-         * @return The journal builder.
-         * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead
-         */
-        @Deprecated(forRemoval = true, since = "9.0.3")
-        public Builder<E> withNamespace(final JournalSerdes namespace) {
-            return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper());
-        }
-
-        /**
-         * Sets journal serializer.
-         *
-         * @param mapper Journal serializer
-         * @return The journal builder
-         */
-        public Builder<E> withMapper(final ByteBufMapper<E> mapper) {
-            this.mapper = requireNonNull(mapper);
-            return this;
-        }
-
-        /**
-         * Sets the maximum segment size in bytes.
-         *
-         * <p>
-         * The maximum segment size dictates when journal should roll over to new segments. As entries are written
-         * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the
-         * journal will create a new segment and append new entries to that segment.
-         *
-         * <p>
-         * By default, the maximum segment size is 32M.
-         *
-         * @param maxSegmentSize The maximum segment size in bytes.
-         * @return The storage builder.
-         * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
-         */
-        public Builder<E> withMaxSegmentSize(final int maxSegmentSize) {
-            byteJournalBuilder.withMaxSegmentSize(maxSegmentSize);
-            return this;
-        }
-
-        /**
-         * Sets the maximum entry size in bytes.
-         *
-         * @param maxEntrySize the maximum entry size in bytes
-         * @return the storage builder
-         * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
-         */
-        public Builder<E> withMaxEntrySize(final int maxEntrySize) {
-            byteJournalBuilder.withMaxEntrySize(maxEntrySize);
-            return this;
-        }
-
-        /**
-         * Sets the maximum number of entries per segment.
-         *
-         * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
-         * @return The journal builder.
-         * @deprecated since 3.0.2, no longer used
-         */
-        @Deprecated
-        public Builder<E> withMaxEntriesPerSegment(final int maxEntriesPerSegment) {
-            // ignore
-            return this;
-        }
-
-        /**
-         * Sets the journal index density.
-         *
-         * <p>
-         * The index density is the frequency at which the position of entries written to the journal will be recorded
-         * in an in-memory index for faster seeking.
-         *
-         * @param indexDensity the index density
-         * @return the journal builder
-         * @throws IllegalArgumentException if the density is not between 0 and 1
-         */
-        public Builder<E> withIndexDensity(final double indexDensity) {
-            byteJournalBuilder.withIndexDensity(indexDensity);
-            return this;
-        }
-
-        /**
-         * Enables flushing buffers to disk when entries are committed to a segment.
-         *
-         * <p>
-         * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
-         * entry is committed in a given segment.
-         *
-         * @return The journal builder.
-         */
-        public Builder<E> withFlushOnCommit() {
-            return withFlushOnCommit(true);
-        }
-
-        /**
-         * Enables flushing buffers to disk when entries are committed to a segment.
-         *
-         * <p>
-         * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
-         * entry is committed in a given segment.
-         *
-         * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
-         * @return The journal builder.
-         */
-        public Builder<E> withFlushOnCommit(final boolean flushOnCommit) {
-            byteJournalBuilder.withFlushOnCommit(flushOnCommit);
-            return this;
-        }
-
-        /**
-         * Build the {@link SegmentedJournal}.
-         *
-         * @return {@link SegmentedJournal} instance.
-         */
-        public SegmentedJournal<E> build() {
-            return new SegmentedJournal<>(byteJournalBuilder.build(), mapper);
-        }
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("journal", journal).toString();
     }
 }
index 05a758a366c4ce72ad1160ec1e6177f516febfc4..d5227b83cbcd1a8610492030276e5d15f0560bda 100644 (file)
@@ -76,15 +76,14 @@ public abstract class AbstractJournalTest {
         return runs;
     }
 
-    protected SegmentedJournal<TestEntry> createJournal() {
-        return SegmentedJournal.<TestEntry>builder()
+    private SegmentedJournal<TestEntry> createJournal() {
+        return new SegmentedJournal<>(SegmentedByteBufJournal.builder()
             .withName("test")
             .withDirectory(PATH.toFile())
-            .withNamespace(NAMESPACE)
             .withStorageLevel(storageLevel)
             .withMaxSegmentSize(maxSegmentSize)
             .withIndexDensity(.2)
-            .build();
+            .build(), NAMESPACE.toMapper());
     }
 
     @Test
index bf1700f7f04eb64dda6751e436068a662da4e097..c5f9d7205eb3d9b7886a818c8333e268079372ee 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.base.VerifyException;
 import io.atomix.storage.journal.JournalReader;
 import io.atomix.storage.journal.JournalSerdes;
 import io.atomix.storage.journal.JournalWriter;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
 import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
@@ -40,13 +41,16 @@ final class DataJournalV0 extends DataJournal {
     DataJournalV0(final String persistenceId, final Histogram messageSize, final ActorSystem system,
             final StorageLevel storage, final File directory, final int maxEntrySize, final int maxSegmentSize) {
         super(persistenceId, messageSize);
-        entries = SegmentedJournal.<DataJournalEntry>builder()
-                .withStorageLevel(storage).withDirectory(directory).withName("data")
-                .withNamespace(JournalSerdes.builder()
-                    .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class)
-                    .build())
-                .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
-                .build();
+        entries = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
+            .withDirectory(directory)
+            .withName("data")
+            .withStorageLevel(storage)
+            .withMaxEntrySize(maxEntrySize)
+            .withMaxSegmentSize(maxSegmentSize)
+            .build(),
+            JournalSerdes.builder()
+                .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class)
+                .build().toMapper());
     }
 
     @Override
index 7e285f7d0d41d3c31b23c64b84304606aee2d226..89d9c64c25fdc33a29ad30e71ae5c7e651073620 100644 (file)
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Stopwatch;
 import io.atomix.storage.journal.Indexed;
 import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
 import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.StorageLevel;
 import java.io.File;
@@ -494,15 +495,14 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
         }
 
         final var sw = Stopwatch.createStarted();
-        deleteJournal = SegmentedJournal.<Long>builder()
+        deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
             .withDirectory(directory)
             .withName("delete")
-            .withNamespace(DELETE_NAMESPACE)
             .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
-            .build();
+            .build(), DELETE_NAMESPACE.toMapper());
         final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
             .tryNext((index, value, length) -> value);
-        lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue();
+        lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
 
         dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
             maxEntrySize, maxSegmentSize);