*/
ByteBufReader openCommitsReader(long index);
+ /**
+ * Compacts the journal up to the given index.
+ *
+ * <p>
+ * The semantics of compaction are not specified by this interface.
+ *
+ * @param index The index up to which to compact the journal.
+ */
+ void compact(long index);
+
@Override
void close();
}
*/
JournalReader<E> openReader(long index, Mode mode);
+ /**
+ * Compacts the journal up to the given index.
+ *
+ * <p>
+ * The semantics of compaction are not specified by this interface.
+ *
+ * @param index The index up to which to compact the journal.
+ */
+ void compact(long index);
+
@Override
void close();
}
private final File directory;
private final int maxSegmentSize;
private final int maxEntrySize;
+ @Deprecated(forRemoval = true)
+ private final int maxEntriesPerSegment;
private final double indexDensity;
private final boolean flushOnCommit;
private final @NonNull ByteBufWriter writer;
private JournalSegment currentSegment;
private volatile long commitIndex;
- public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
- final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) {
+ SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
+ final int maxSegmentSize, final int maxEntrySize, final int maxEntriesPerSegment, final double indexDensity,
+ final boolean flushOnCommit) {
this.name = requireNonNull(name, "name cannot be null");
this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
this.directory = requireNonNull(directory, "directory cannot be null");
this.maxSegmentSize = maxSegmentSize;
this.maxEntrySize = maxEntrySize;
+ this.maxEntriesPerSegment = maxEntriesPerSegment;
this.indexDensity = indexDensity;
this.flushOnCommit = flushOnCommit;
.withId(segmentId)
.withIndex(firstIndex)
.withMaxSegmentSize(maxSegmentSize)
- // FIXME: propagate maxEntries
- .withMaxEntries(Integer.MAX_VALUE)
+ .withMaxEntries(maxEntriesPerSegment)
.withUpdated(System.currentTimeMillis())
.build());
} catch (IOException e) {
return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
}
- /**
- * Compacts the journal up to the given index.
- *
- * <p>
- * The semantics of compaction are not specified by this interface.
- *
- * @param index The index up to which to compact the journal.
- */
+ @Override
public void compact(final long index) {
final var firstIndex = getCompactableIndex(index);
if (firstIndex != 0) {
private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
+ private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
private static final double DEFAULT_INDEX_DENSITY = .005;
private String name = DEFAULT_NAME;
private File directory = new File(DEFAULT_DIRECTORY);
private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
+ private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
private double indexDensity = DEFAULT_INDEX_DENSITY;
private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
return this;
}
+ /**
+ * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
+ *
+ * <p>
+ * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a
+ * segment of the log, if the entry count in that segment meets the configured maximum entry count, the log will
+ * create a new segment and append new entries to that segment.
+ *
+ * <p>
+ * By default, the maximum entries per segment is {@code 1024 * 1024}.
+ *
+ * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
+ * @return The storage builder.
+ * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
+ * per segment
+ * @deprecated This option has no effect and is scheduled for removal.
+ */
+ @Deprecated(forRemoval = true, since = "9.0.3")
+ public Builder withMaxEntriesPerSegment(final int maxEntriesPerSegment) {
+ checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
+ checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
+ "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
+ this.maxEntriesPerSegment = maxEntriesPerSegment;
+ return this;
+ }
+
/**
* Sets the journal index density.
*
*/
public SegmentedByteBufJournal build() {
return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize,
- indexDensity, flushOnCommit);
+ maxEntriesPerSegment, indexDensity, flushOnCommit);
}
}
}
/*
* Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
*
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package io.atomix.storage.journal;
import static java.util.Objects.requireNonNull;
-import java.io.File;
+import com.google.common.base.MoreObjects;
/**
- * Segmented journal.
+ * A {@link Journal} implementation based on a {@link ByteBufJournal}.
*/
public final class SegmentedJournal<E> implements Journal<E> {
- private final SegmentedByteBufJournal journal;
private final SegmentedJournalWriter<E> writer;
private final ByteBufMapper<E> mapper;
+ private final ByteBufJournal journal;
- public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper<E> mapper) {
+ public SegmentedJournal(final ByteBufJournal journal, final ByteBufMapper<E> mapper) {
this.journal = requireNonNull(journal, "journal is required");
this.mapper = requireNonNull(mapper, "mapper cannot be null");
writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
}
@Override
- public void close() {
- journal.close();
- }
-
- /**
- * Compacts the journal up to the given index.
- *
- * <p>
- * The semantics of compaction are not specified by this interface.
- *
- * @param index The index up to which to compact the journal.
- */
public void compact(final long index) {
journal.compact(index);
}
- /**
- * Returns a new segmented journal builder.
- *
- * @return A new segmented journal builder.
- */
- public static <E> Builder<E> builder() {
- return new Builder<>();
+ @Override
+ public void close() {
+ journal.close();
}
- public static final class Builder<E> {
- private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder();
- private ByteBufMapper<E> mapper;
-
- private Builder() {
- // on purpose
- }
-
- /**
- * Sets the journal name.
- *
- * @param name The journal name.
- * @return The journal builder.
- */
- public Builder<E> withName(final String name) {
- byteJournalBuilder.withName(name);
- return this;
- }
-
- /**
- * Sets the journal storage level.
- *
- * <p>
- * The storage level indicates how individual entries will be persisted in the journal.
- *
- * @param storageLevel The log storage level.
- * @return The journal builder.
- */
- public Builder<E> withStorageLevel(final StorageLevel storageLevel) {
- byteJournalBuilder.withStorageLevel(storageLevel);
- return this;
- }
-
- /**
- * Sets the journal storage directory.
- *
- * <p>
- * The journal will write segment files into the provided directory.
- *
- * @param directory The journal storage directory.
- * @return The journal builder.
- * @throws NullPointerException If the {@code directory} is {@code null}
- */
- public Builder<E> withDirectory(final String directory) {
- byteJournalBuilder.withDirectory(directory);
- return this;
- }
-
- /**
- * Sets the journal storage directory.
- *
- * <p>
- * The journal will write segment files into the provided directory.
- *
- * @param directory The journal storage directory.
- * @return The journal builder.
- * @throws NullPointerException If the {@code directory} is {@code null}
- */
- public Builder<E> withDirectory(final File directory) {
- byteJournalBuilder.withDirectory(directory);
- return this;
- }
-
- /**
- * Sets the journal namespace.
- *
- * @param namespace The journal serializer.
- * @return The journal builder.
- * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead
- */
- @Deprecated(forRemoval = true, since = "9.0.3")
- public Builder<E> withNamespace(final JournalSerdes namespace) {
- return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper());
- }
-
- /**
- * Sets journal serializer.
- *
- * @param mapper Journal serializer
- * @return The journal builder
- */
- public Builder<E> withMapper(final ByteBufMapper<E> mapper) {
- this.mapper = requireNonNull(mapper);
- return this;
- }
-
- /**
- * Sets the maximum segment size in bytes.
- *
- * <p>
- * The maximum segment size dictates when journal should roll over to new segments. As entries are written
- * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the
- * journal will create a new segment and append new entries to that segment.
- *
- * <p>
- * By default, the maximum segment size is 32M.
- *
- * @param maxSegmentSize The maximum segment size in bytes.
- * @return The storage builder.
- * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
- */
- public Builder<E> withMaxSegmentSize(final int maxSegmentSize) {
- byteJournalBuilder.withMaxSegmentSize(maxSegmentSize);
- return this;
- }
-
- /**
- * Sets the maximum entry size in bytes.
- *
- * @param maxEntrySize the maximum entry size in bytes
- * @return the storage builder
- * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
- */
- public Builder<E> withMaxEntrySize(final int maxEntrySize) {
- byteJournalBuilder.withMaxEntrySize(maxEntrySize);
- return this;
- }
-
- /**
- * Sets the maximum number of entries per segment.
- *
- * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
- * @return The journal builder.
- * @deprecated since 3.0.2, no longer used
- */
- @Deprecated
- public Builder<E> withMaxEntriesPerSegment(final int maxEntriesPerSegment) {
- // ignore
- return this;
- }
-
- /**
- * Sets the journal index density.
- *
- * <p>
- * The index density is the frequency at which the position of entries written to the journal will be recorded
- * in an in-memory index for faster seeking.
- *
- * @param indexDensity the index density
- * @return the journal builder
- * @throws IllegalArgumentException if the density is not between 0 and 1
- */
- public Builder<E> withIndexDensity(final double indexDensity) {
- byteJournalBuilder.withIndexDensity(indexDensity);
- return this;
- }
-
- /**
- * Enables flushing buffers to disk when entries are committed to a segment.
- *
- * <p>
- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
- * entry is committed in a given segment.
- *
- * @return The journal builder.
- */
- public Builder<E> withFlushOnCommit() {
- return withFlushOnCommit(true);
- }
-
- /**
- * Enables flushing buffers to disk when entries are committed to a segment.
- *
- * <p>
- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
- * entry is committed in a given segment.
- *
- * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
- * @return The journal builder.
- */
- public Builder<E> withFlushOnCommit(final boolean flushOnCommit) {
- byteJournalBuilder.withFlushOnCommit(flushOnCommit);
- return this;
- }
-
- /**
- * Build the {@link SegmentedJournal}.
- *
- * @return {@link SegmentedJournal} instance.
- */
- public SegmentedJournal<E> build() {
- return new SegmentedJournal<>(byteJournalBuilder.build(), mapper);
- }
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("journal", journal).toString();
}
}
return runs;
}
- protected SegmentedJournal<TestEntry> createJournal() {
- return SegmentedJournal.<TestEntry>builder()
+ private SegmentedJournal<TestEntry> createJournal() {
+ return new SegmentedJournal<>(SegmentedByteBufJournal.builder()
.withName("test")
.withDirectory(PATH.toFile())
- .withNamespace(NAMESPACE)
.withStorageLevel(storageLevel)
.withMaxSegmentSize(maxSegmentSize)
.withIndexDensity(.2)
- .build();
+ .build(), NAMESPACE.toMapper());
}
@Test
import io.atomix.storage.journal.JournalReader;
import io.atomix.storage.journal.JournalSerdes;
import io.atomix.storage.journal.JournalWriter;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
DataJournalV0(final String persistenceId, final Histogram messageSize, final ActorSystem system,
final StorageLevel storage, final File directory, final int maxEntrySize, final int maxSegmentSize) {
super(persistenceId, messageSize);
- entries = SegmentedJournal.<DataJournalEntry>builder()
- .withStorageLevel(storage).withDirectory(directory).withName("data")
- .withNamespace(JournalSerdes.builder()
- .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class)
- .build())
- .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize)
- .build();
+ entries = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
+ .withDirectory(directory)
+ .withName("data")
+ .withStorageLevel(storage)
+ .withMaxEntrySize(maxEntrySize)
+ .withMaxSegmentSize(maxSegmentSize)
+ .build(),
+ JournalSerdes.builder()
+ .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class)
+ .build().toMapper());
}
@Override
import com.google.common.base.Stopwatch;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalSerdes;
+import io.atomix.storage.journal.SegmentedByteBufJournal;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
}
final var sw = Stopwatch.createStarted();
- deleteJournal = SegmentedJournal.<Long>builder()
+ deleteJournal = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
.withDirectory(directory)
.withName("delete")
- .withNamespace(DELETE_NAMESPACE)
.withMaxSegmentSize(DELETE_SEGMENT_SIZE)
- .build();
+ .build(), DELETE_NAMESPACE.toMapper());
final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
.tryNext((index, value, length) -> value);
- lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered.longValue();
+ lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;
dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
maxEntrySize, maxSegmentSize);