From c3abf2298636dfc31f6d19eb4e3070fd57ecbae3 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 8 May 2024 23:48:14 +0200 Subject: [PATCH] Clean up Segmented(ByteBuf)Journal Remove SegmentedJournal.Builder(), as it really has no point. Also expose the compact() method and restore maxEntriesPerSegment. JIRA: CONTROLLER-2115 Change-Id: I7213cdca037d45ad9f15f2e577bd8cb1e6b75156 Signed-off-by: Robert Varga --- .../storage/journal/ByteBufJournal.java | 10 + .../io/atomix/storage/journal/Journal.java | 10 + .../journal/SegmentedByteBufJournal.java | 50 ++-- .../SegmentedCommitsByteBufReader.java | 14 +- .../storage/journal/SegmentedJournal.java | 216 +----------------- .../storage/journal/AbstractJournalTest.java | 7 +- .../akka/segjournal/DataJournalV0.java | 18 +- .../segjournal/SegmentedJournalActor.java | 8 +- 8 files changed, 96 insertions(+), 237 deletions(-) diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java index dc3e75bcde..cae71fffcb 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufJournal.java @@ -53,6 +53,16 @@ public interface ByteBufJournal extends AutoCloseable { */ ByteBufReader openCommitsReader(long index); + /** + * Compacts the journal up to the given index. + * + *

+ * The semantics of compaction are not specified by this interface. + * + * @param index The index up to which to compact the journal. + */ + void compact(long index); + @Override void close(); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java index 39be7d4d5f..b80324e9a1 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/Journal.java @@ -56,6 +56,16 @@ public interface Journal extends AutoCloseable { */ JournalReader openReader(long index, Mode mode); + /** + * Compacts the journal up to the given index. + * + *

+ * The semantics of compaction are not specified by this interface. + * + * @param index The index up to which to compact the journal. + */ + void compact(long index); + @Override void close(); } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java index 0d42181851..2afc434f33 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufJournal.java @@ -48,6 +48,8 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { private final File directory; private final int maxSegmentSize; private final int maxEntrySize; + @Deprecated(forRemoval = true) + private final int maxEntriesPerSegment; private final double indexDensity; private final boolean flushOnCommit; private final @NonNull ByteBufWriter writer; @@ -56,13 +58,15 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { private JournalSegment currentSegment; private volatile long commitIndex; - public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory, - final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) { + SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory, + final int maxSegmentSize, final int maxEntrySize, final int maxEntriesPerSegment, final double indexDensity, + final boolean flushOnCommit) { this.name = requireNonNull(name, "name cannot be null"); this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); this.directory = requireNonNull(directory, "directory cannot be null"); this.maxSegmentSize = maxSegmentSize; this.maxEntrySize = maxEntrySize; + this.maxEntriesPerSegment = maxEntriesPerSegment; this.indexDensity = indexDensity; this.flushOnCommit = flushOnCommit; @@ -256,8 +260,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { .withId(segmentId) .withIndex(firstIndex) .withMaxSegmentSize(maxSegmentSize) - // FIXME: propagate maxEntries - .withMaxEntries(Integer.MAX_VALUE) + .withMaxEntries(maxEntriesPerSegment) .withUpdated(System.currentTimeMillis()) .build()); } catch (IOException e) { @@ -390,14 +393,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0; } - /** - * Compacts the journal up to the given index. - * - *

- * The semantics of compaction are not specified by this interface. - * - * @param index The index up to which to compact the journal. - */ + @Override public void compact(final long index) { final var firstIndex = getCompactableIndex(index); if (firstIndex != 0) { @@ -460,6 +456,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir"); private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32; private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024; + private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024; private static final double DEFAULT_INDEX_DENSITY = .005; private String name = DEFAULT_NAME; @@ -467,6 +464,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { private File directory = new File(DEFAULT_DIRECTORY); private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE; private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE; + private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT; private double indexDensity = DEFAULT_INDEX_DENSITY; private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT; @@ -547,6 +545,32 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { return this; } + /** + * Sets the maximum number of allows entries per segment, returning the builder for method chaining. + * + *

+ * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a + * segment of the log, if the entry count in that segment meets the configured maximum entry count, the log will + * create a new segment and append new entries to that segment. + * + *

+ * By default, the maximum entries per segment is {@code 1024 * 1024}. + * + * @param maxEntriesPerSegment The maximum number of entries allowed per segment. + * @return The storage builder. + * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries + * per segment + * @deprecated This option has no effect and is scheduled for removal. + */ + @Deprecated(forRemoval = true, since = "9.0.3") + public Builder withMaxEntriesPerSegment(final int maxEntriesPerSegment) { + checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive"); + checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT, + "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT); + this.maxEntriesPerSegment = maxEntriesPerSegment; + return this; + } + /** * Sets the journal index density. * @@ -599,7 +623,7 @@ public final class SegmentedByteBufJournal implements ByteBufJournal { */ public SegmentedByteBufJournal build() { return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize, - indexDensity, flushOnCommit); + maxEntriesPerSegment, indexDensity, flushOnCommit); } } } diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java index 43fae62a1e..135549a9a1 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedCommitsByteBufReader.java @@ -1,9 +1,17 @@ /* * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved. * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.atomix.storage.journal; diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 35b0cb87bc..11970ba083 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -18,17 +18,17 @@ package io.atomix.storage.journal; import static java.util.Objects.requireNonNull; -import java.io.File; +import com.google.common.base.MoreObjects; /** - * Segmented journal. + * A {@link Journal} implementation based on a {@link ByteBufJournal}. */ public final class SegmentedJournal implements Journal { - private final SegmentedByteBufJournal journal; private final SegmentedJournalWriter writer; private final ByteBufMapper mapper; + private final ByteBufJournal journal; - public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper mapper) { + public SegmentedJournal(final ByteBufJournal journal, final ByteBufMapper mapper) { this.journal = requireNonNull(journal, "journal is required"); this.mapper = requireNonNull(mapper, "mapper cannot be null"); writer = new SegmentedJournalWriter<>(journal.writer(), mapper); @@ -66,213 +66,17 @@ public final class SegmentedJournal implements Journal { } @Override - public void close() { - journal.close(); - } - - /** - * Compacts the journal up to the given index. - * - *

- * The semantics of compaction are not specified by this interface. - * - * @param index The index up to which to compact the journal. - */ public void compact(final long index) { journal.compact(index); } - /** - * Returns a new segmented journal builder. - * - * @return A new segmented journal builder. - */ - public static Builder builder() { - return new Builder<>(); + @Override + public void close() { + journal.close(); } - public static final class Builder { - private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder(); - private ByteBufMapper mapper; - - private Builder() { - // on purpose - } - - /** - * Sets the journal name. - * - * @param name The journal name. - * @return The journal builder. - */ - public Builder withName(final String name) { - byteJournalBuilder.withName(name); - return this; - } - - /** - * Sets the journal storage level. - * - *

- * The storage level indicates how individual entries will be persisted in the journal. - * - * @param storageLevel The log storage level. - * @return The journal builder. - */ - public Builder withStorageLevel(final StorageLevel storageLevel) { - byteJournalBuilder.withStorageLevel(storageLevel); - return this; - } - - /** - * Sets the journal storage directory. - * - *

- * The journal will write segment files into the provided directory. - * - * @param directory The journal storage directory. - * @return The journal builder. - * @throws NullPointerException If the {@code directory} is {@code null} - */ - public Builder withDirectory(final String directory) { - byteJournalBuilder.withDirectory(directory); - return this; - } - - /** - * Sets the journal storage directory. - * - *

- * The journal will write segment files into the provided directory. - * - * @param directory The journal storage directory. - * @return The journal builder. - * @throws NullPointerException If the {@code directory} is {@code null} - */ - public Builder withDirectory(final File directory) { - byteJournalBuilder.withDirectory(directory); - return this; - } - - /** - * Sets the journal namespace. - * - * @param namespace The journal serializer. - * @return The journal builder. - * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead - */ - @Deprecated(forRemoval = true, since = "9.0.3") - public Builder withNamespace(final JournalSerdes namespace) { - return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper()); - } - - /** - * Sets journal serializer. - * - * @param mapper Journal serializer - * @return The journal builder - */ - public Builder withMapper(final ByteBufMapper mapper) { - this.mapper = requireNonNull(mapper); - return this; - } - - /** - * Sets the maximum segment size in bytes. - * - *

- * The maximum segment size dictates when journal should roll over to new segments. As entries are written - * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the - * journal will create a new segment and append new entries to that segment. - * - *

- * By default, the maximum segment size is 32M. - * - * @param maxSegmentSize The maximum segment size in bytes. - * @return The storage builder. - * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive - */ - public Builder withMaxSegmentSize(final int maxSegmentSize) { - byteJournalBuilder.withMaxSegmentSize(maxSegmentSize); - return this; - } - - /** - * Sets the maximum entry size in bytes. - * - * @param maxEntrySize the maximum entry size in bytes - * @return the storage builder - * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive - */ - public Builder withMaxEntrySize(final int maxEntrySize) { - byteJournalBuilder.withMaxEntrySize(maxEntrySize); - return this; - } - - /** - * Sets the maximum number of entries per segment. - * - * @param maxEntriesPerSegment The maximum number of entries allowed per segment. - * @return The journal builder. - * @deprecated since 3.0.2, no longer used - */ - @Deprecated - public Builder withMaxEntriesPerSegment(final int maxEntriesPerSegment) { - // ignore - return this; - } - - /** - * Sets the journal index density. - * - *

- * The index density is the frequency at which the position of entries written to the journal will be recorded - * in an in-memory index for faster seeking. - * - * @param indexDensity the index density - * @return the journal builder - * @throws IllegalArgumentException if the density is not between 0 and 1 - */ - public Builder withIndexDensity(final double indexDensity) { - byteJournalBuilder.withIndexDensity(indexDensity); - return this; - } - - /** - * Enables flushing buffers to disk when entries are committed to a segment. - * - *

- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an - * entry is committed in a given segment. - * - * @return The journal builder. - */ - public Builder withFlushOnCommit() { - return withFlushOnCommit(true); - } - - /** - * Enables flushing buffers to disk when entries are committed to a segment. - * - *

- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an - * entry is committed in a given segment. - * - * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment. - * @return The journal builder. - */ - public Builder withFlushOnCommit(final boolean flushOnCommit) { - byteJournalBuilder.withFlushOnCommit(flushOnCommit); - return this; - } - - /** - * Build the {@link SegmentedJournal}. - * - * @return {@link SegmentedJournal} instance. - */ - public SegmentedJournal build() { - return new SegmentedJournal<>(byteJournalBuilder.build(), mapper); - } + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("journal", journal).toString(); } } diff --git a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java index 05a758a366..d5227b83cb 100644 --- a/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java +++ b/atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java @@ -76,15 +76,14 @@ public abstract class AbstractJournalTest { return runs; } - protected SegmentedJournal createJournal() { - return SegmentedJournal.builder() + private SegmentedJournal createJournal() { + return new SegmentedJournal<>(SegmentedByteBufJournal.builder() .withName("test") .withDirectory(PATH.toFile()) - .withNamespace(NAMESPACE) .withStorageLevel(storageLevel) .withMaxSegmentSize(maxSegmentSize) .withIndexDensity(.2) - .build(); + .build(), NAMESPACE.toMapper()); } @Test diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java index bf1700f7f0..c5f9d7205e 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java @@ -14,6 +14,7 @@ import com.google.common.base.VerifyException; import io.atomix.storage.journal.JournalReader; import io.atomix.storage.journal.JournalSerdes; import io.atomix.storage.journal.JournalWriter; +import io.atomix.storage.journal.SegmentedByteBufJournal; import io.atomix.storage.journal.SegmentedJournal; import io.atomix.storage.journal.StorageLevel; import java.io.File; @@ -40,13 +41,16 @@ final class DataJournalV0 extends DataJournal { DataJournalV0(final String persistenceId, final Histogram messageSize, final ActorSystem system, final StorageLevel storage, final File directory, final int maxEntrySize, final int maxSegmentSize) { super(persistenceId, messageSize); - entries = SegmentedJournal.builder() - .withStorageLevel(storage).withDirectory(directory).withName("data") - .withNamespace(JournalSerdes.builder() - .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class) - .build()) - .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize) - .build(); + entries = new SegmentedJournal<>(SegmentedByteBufJournal.builder() + .withDirectory(directory) + .withName("data") + .withStorageLevel(storage) + .withMaxEntrySize(maxEntrySize) + .withMaxSegmentSize(maxSegmentSize) + .build(), + JournalSerdes.builder() + .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class) + .build().toMapper()); } @Override diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 7e285f7d0d..89d9c64c25 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Stopwatch; import io.atomix.storage.journal.Indexed; import io.atomix.storage.journal.JournalSerdes; +import io.atomix.storage.journal.SegmentedByteBufJournal; import io.atomix.storage.journal.SegmentedJournal; import io.atomix.storage.journal.StorageLevel; import java.io.File; @@ -494,15 +495,14 @@ abstract sealed class SegmentedJournalActor extends AbstractActor { } final var sw = Stopwatch.createStarted(); - deleteJournal = SegmentedJournal.builder() + deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder() .withDirectory(directory) .withName("delete") - .withNamespace(DELETE_NAMESPACE) .withMaxSegmentSize(DELETE_SEGMENT_SIZE) - .build(); + .build(), DELETE_NAMESPACE.toMapper()); final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex()) .tryNext((index, value, length) -> value); - lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue(); + lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered; dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory, maxEntrySize, maxSegmentSize); -- 2.36.6