/*
- * Copyright 2017-present Open Networking Foundation
+ * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
package io.atomix.storage.journal;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.RaftJournal;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
+
/**
- * Segmented journal.
+ * A {@link Journal} implementation based on a {@link RaftJournal}.
*/
public final class SegmentedJournal<E> implements Journal<E> {
+ private final @NonNull SegmentedJournalWriter<E> writer;
+ private final @NonNull FromByteBufMapper<E> readMapper;
+ private final @NonNull RaftJournal journal;
- /**
- * Returns a new Raft log builder.
- *
- * @return A new Raft log builder.
- */
- public static <E> Builder<E> builder() {
- return new Builder<>();
- }
-
- private static final int SEGMENT_BUFFER_FACTOR = 3;
-
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final String name;
- private final StorageLevel storageLevel;
- private final File directory;
- private final JournalSerdes namespace;
- private final int maxSegmentSize;
- private final int maxEntrySize;
- private final int maxEntriesPerSegment;
- private final double indexDensity;
- private final boolean flushOnCommit;
- private final SegmentedJournalWriter<E> writer;
- private volatile long commitIndex;
-
- private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
- private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
- private JournalSegment<E> currentSegment;
-
- private volatile boolean open = true;
-
- public SegmentedJournal(
- String name,
- StorageLevel storageLevel,
- File directory,
- JournalSerdes namespace,
- int maxSegmentSize,
- int maxEntrySize,
- int maxEntriesPerSegment,
- double indexDensity,
- boolean flushOnCommit) {
- this.name = requireNonNull(name, "name cannot be null");
- this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
- this.directory = requireNonNull(directory, "directory cannot be null");
- this.namespace = requireNonNull(namespace, "namespace cannot be null");
- this.maxSegmentSize = maxSegmentSize;
- this.maxEntrySize = maxEntrySize;
- this.maxEntriesPerSegment = maxEntriesPerSegment;
- this.indexDensity = indexDensity;
- this.flushOnCommit = flushOnCommit;
- open();
- this.writer = openWriter();
- }
-
- /**
- * Returns the segment file name prefix.
- *
- * @return The segment file name prefix.
- */
- public String name() {
- return name;
- }
-
- /**
- * Returns the storage directory.
- * <p>
- * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
- * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
- * when the log is opened.
- *
- * @return The storage directory.
- */
- public File directory() {
- return directory;
- }
-
- /**
- * Returns the storage level.
- * <p>
- * The storage level dictates how entries within individual journal segments should be stored.
- *
- * @return The storage level.
- */
- public StorageLevel storageLevel() {
- return storageLevel;
- }
-
- /**
- * Returns the maximum journal segment size.
- * <p>
- * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
- *
- * @return The maximum segment size in bytes.
- */
- public int maxSegmentSize() {
- return maxSegmentSize;
- }
-
- /**
- * Returns the maximum journal entry size.
- * <p>
- * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
- *
- * @return the maximum entry size in bytes
- */
- public int maxEntrySize() {
- return maxEntrySize;
- }
-
- /**
- * Returns the maximum number of entries per segment.
- * <p>
- * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
- * in a journal.
- *
- * @return The maximum number of entries per segment.
- * @deprecated since 3.0.2
- */
- @Deprecated
- public int maxEntriesPerSegment() {
- return maxEntriesPerSegment;
- }
-
- /**
- * Returns the collection of journal segments.
- *
- * @return the collection of journal segments
- */
- public Collection<JournalSegment<E>> segments() {
- return segments.values();
- }
-
- /**
- * Returns the collection of journal segments with indexes greater than the given index.
- *
- * @param index the starting index
- * @return the journal segments starting with indexes greater than or equal to the given index
- */
- public Collection<JournalSegment<E>> segments(long index) {
- return segments.tailMap(index).values();
- }
-
- /**
- * Returns the total size of the journal.
- *
- * @return the total size of the journal
- */
- public long size() {
- return segments.values().stream()
- .mapToLong(segment -> segment.size())
- .sum();
- }
-
- @Override
- public SegmentedJournalWriter<E> writer() {
- return writer;
- }
-
- @Override
- public SegmentedJournalReader<E> openReader(long index) {
- return openReader(index, SegmentedJournalReader.Mode.ALL);
- }
-
- /**
- * Opens a new Raft log reader with the given reader mode.
- *
- * @param index The index from which to begin reading entries.
- * @param mode The mode in which to read entries.
- * @return The Raft log reader.
- */
- public SegmentedJournalReader<E> openReader(long index, SegmentedJournalReader.Mode mode) {
- SegmentedJournalReader<E> reader = new SegmentedJournalReader<>(this, index, mode);
- readers.add(reader);
- return reader;
- }
-
- /**
- * Opens a new journal writer.
- *
- * @return A new journal writer.
- */
- protected SegmentedJournalWriter<E> openWriter() {
- return new SegmentedJournalWriter<>(this);
- }
-
- /**
- * Opens the segments.
- */
- private synchronized void open() {
- // Load existing log segments from disk.
- for (JournalSegment<E> segment : loadSegments()) {
- segments.put(segment.descriptor().index(), segment);
- }
-
- // If a segment doesn't already exist, create an initial segment starting at index 1.
- if (!segments.isEmpty()) {
- currentSegment = segments.lastEntry().getValue();
- } else {
- JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
- .withId(1)
- .withIndex(1)
- .withMaxSegmentSize(maxSegmentSize)
- .withMaxEntries(maxEntriesPerSegment)
- .build();
-
- currentSegment = createSegment(descriptor);
- currentSegment.descriptor().update(System.currentTimeMillis());
-
- segments.put(1L, currentSegment);
- }
- }
-
- /**
- * Asserts that the manager is open.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- private void assertOpen() {
- checkState(currentSegment != null, "journal not open");
- }
-
- /**
- * Asserts that enough disk space is available to allocate a new segment.
- */
- private void assertDiskSpace() {
- if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
- throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
- }
- }
-
- /**
- * Resets the current segment, creating a new segment if necessary.
- */
- private synchronized void resetCurrentSegment() {
- JournalSegment<E> lastSegment = getLastSegment();
- if (lastSegment != null) {
- currentSegment = lastSegment;
- } else {
- JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
- .withId(1)
- .withIndex(1)
- .withMaxSegmentSize(maxSegmentSize)
- .withMaxEntries(maxEntriesPerSegment)
- .build();
-
- currentSegment = createSegment(descriptor);
-
- segments.put(1L, currentSegment);
- }
- }
-
- /**
- * Resets and returns the first segment in the journal.
- *
- * @param index the starting index of the journal
- * @return the first segment
- */
- JournalSegment<E> resetSegments(long index) {
- assertOpen();
-
- // If the index already equals the first segment index, skip the reset.
- JournalSegment<E> firstSegment = getFirstSegment();
- if (index == firstSegment.index()) {
- return firstSegment;
- }
-
- for (JournalSegment<E> segment : segments.values()) {
- segment.close();
- segment.delete();
- }
- segments.clear();
-
- JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
- .withId(1)
- .withIndex(index)
- .withMaxSegmentSize(maxSegmentSize)
- .withMaxEntries(maxEntriesPerSegment)
- .build();
- currentSegment = createSegment(descriptor);
- segments.put(index, currentSegment);
- return currentSegment;
- }
-
- /**
- * Returns the first segment in the log.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- JournalSegment<E> getFirstSegment() {
- assertOpen();
- Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
- return segment != null ? segment.getValue() : null;
- }
-
- /**
- * Returns the last segment in the log.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- JournalSegment<E> getLastSegment() {
- assertOpen();
- Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
- return segment != null ? segment.getValue() : null;
- }
-
- /**
- * Creates and returns the next segment.
- *
- * @return The next segment.
- * @throws IllegalStateException if the segment manager is not open
- */
- synchronized JournalSegment<E> getNextSegment() {
- assertOpen();
- assertDiskSpace();
-
- JournalSegment<E> lastSegment = getLastSegment();
- JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
- .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
- .withIndex(currentSegment.lastIndex() + 1)
- .withMaxSegmentSize(maxSegmentSize)
- .withMaxEntries(maxEntriesPerSegment)
- .build();
-
- currentSegment = createSegment(descriptor);
-
- segments.put(descriptor.index(), currentSegment);
- return currentSegment;
- }
-
- /**
- * Returns the segment following the segment with the given ID.
- *
- * @param index The segment index with which to look up the next segment.
- * @return The next segment for the given index.
- */
- JournalSegment<E> getNextSegment(long index) {
- Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
- return nextSegment != null ? nextSegment.getValue() : null;
- }
-
- /**
- * Returns the segment for the given index.
- *
- * @param index The index for which to return the segment.
- * @throws IllegalStateException if the segment manager is not open
- */
- synchronized JournalSegment<E> getSegment(long index) {
- assertOpen();
- // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
- if (currentSegment != null && index > currentSegment.index()) {
- return currentSegment;
- }
-
- // If the index is in another segment, get the entry with the next lowest first index.
- Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
- if (segment != null) {
- return segment.getValue();
- }
- return getFirstSegment();
- }
-
- /**
- * Removes a segment.
- *
- * @param segment The segment to remove.
- */
- synchronized void removeSegment(JournalSegment<E> segment) {
- segments.remove(segment.index());
- segment.close();
- segment.delete();
- resetCurrentSegment();
- }
-
- /**
- * Creates a new segment.
- */
- JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
- File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
-
- RandomAccessFile raf;
- FileChannel channel;
- try {
- raf = new RandomAccessFile(segmentFile, "rw");
- raf.setLength(descriptor.maxSegmentSize());
- channel = raf.getChannel();
- } catch (IOException e) {
- throw new StorageException(e);
- }
-
- ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
- descriptor.copyTo(buffer);
- buffer.flip();
- try {
- channel.write(buffer);
- } catch (IOException e) {
- throw new StorageException(e);
- } finally {
- try {
- channel.close();
- raf.close();
- } catch (IOException e) {
- }
- }
- JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
- log.debug("Created segment: {}", segment);
- return segment;
- }
-
- /**
- * Creates a new segment instance.
- *
- * @param segmentFile The segment file.
- * @param descriptor The segment descriptor.
- * @return The segment instance.
- */
- protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
- return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
- }
-
- /**
- * Loads a segment.
- */
- private JournalSegment<E> loadSegment(long segmentId) {
- File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
- ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
- try (FileChannel channel = openChannel(segmentFile)) {
- channel.read(buffer);
- buffer.flip();
- JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
- JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
- log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
- return segment;
- } catch (IOException e) {
- throw new StorageException(e);
- }
- }
-
- private FileChannel openChannel(File file) {
- try {
- return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
- } catch (IOException e) {
- throw new StorageException(e);
- }
- }
-
- /**
- * Loads all segments from disk.
- *
- * @return A collection of segments for the log.
- */
- protected Collection<JournalSegment<E>> loadSegments() {
- // Ensure log directories are created.
- directory.mkdirs();
-
- TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
-
- // Iterate through all files in the log directory.
- for (File file : directory.listFiles(File::isFile)) {
-
- // If the file looks like a segment file, attempt to load the segment.
- if (JournalSegmentFile.isSegmentFile(name, file)) {
- JournalSegmentFile segmentFile = new JournalSegmentFile(file);
- ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
- try (FileChannel channel = openChannel(file)) {
- channel.read(buffer);
- buffer.flip();
- } catch (IOException e) {
- throw new StorageException(e);
- }
-
- JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
-
- // Load the segment.
- JournalSegment<E> segment = loadSegment(descriptor.id());
-
- // Add the segment to the segments list.
- log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
- segments.put(segment.index(), segment);
- }
+ public SegmentedJournal(final RaftJournal journal, final FromByteBufMapper<E> readMapper,
+ final ToByteBufMapper<E> writeMapper) {
+ this.journal = requireNonNull(journal, "journal is required");
+ this.readMapper = requireNonNull(readMapper, "readMapper cannot be null");
+ writer = new SegmentedJournalWriter<>(journal.writer(),
+ requireNonNull(writeMapper, "writeMapper cannot be null"));
}
- // Verify that all the segments in the log align with one another.
- JournalSegment<E> previousSegment = null;
- boolean corrupted = false;
- Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
- while (iterator.hasNext()) {
- JournalSegment<E> segment = iterator.next().getValue();
- if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
- log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
- corrupted = true;
- }
- if (corrupted) {
- segment.close();
- segment.delete();
- iterator.remove();
- }
- previousSegment = segment;
+ @Override
+ public long firstIndex() {
+ return journal.firstIndex();
}
- return segments.values();
- }
-
- /**
- * Resets journal readers to the given head.
- *
- * @param index The index at which to reset readers.
- */
- void resetHead(long index) {
- for (SegmentedJournalReader<E> reader : readers) {
- if (reader.getNextIndex() < index) {
- reader.reset(index);
- }
+ @Override
+ public long lastIndex() {
+ return journal.lastIndex();
}
- }
- /**
- * Resets journal readers to the given tail.
- *
- * @param index The index at which to reset readers.
- */
- void resetTail(long index) {
- for (SegmentedJournalReader<E> reader : readers) {
- if (reader.getNextIndex() >= index) {
- reader.reset(index);
- }
+ @Override
+ public JournalWriter<E> writer() {
+ return writer;
}
- }
-
- void closeReader(SegmentedJournalReader<E> reader) {
- readers.remove(reader);
- }
- @Override
- public boolean isOpen() {
- return open;
- }
-
- /**
- * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
- *
- * @param index the index from which to remove segments
- * @return indicates whether a segment can be removed from the journal
- */
- public boolean isCompactable(long index) {
- Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
- return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
- }
-
- /**
- * Returns the index of the last segment in the log.
- *
- * @param index the compaction index
- * @return the starting index of the last segment in the log
- */
- public long getCompactableIndex(long index) {
- Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
- return segmentEntry != null ? segmentEntry.getValue().index() : 0;
- }
-
- /**
- * Compacts the journal up to the given index.
- * <p>
- * The semantics of compaction are not specified by this interface.
- *
- * @param index The index up to which to compact the journal.
- */
- public void compact(long index) {
- Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
- if (segmentEntry != null) {
- SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
- if (!compactSegments.isEmpty()) {
- log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
- for (JournalSegment<E> segment : compactSegments.values()) {
- log.trace("Deleting segment: {}", segment);
- segment.close();
- segment.delete();
- }
- compactSegments.clear();
- resetHead(segmentEntry.getValue().index());
- }
- }
- }
-
- @Override
- public void close() {
- segments.values().forEach(segment -> {
- log.debug("Closing segment: {}", segment);
- segment.close();
- });
- currentSegment = null;
- open = false;
- }
-
- /**
- * Returns whether {@code flushOnCommit} is enabled for the log.
- *
- * @return Indicates whether {@code flushOnCommit} is enabled for the log.
- */
- boolean isFlushOnCommit() {
- return flushOnCommit;
- }
-
- /**
- * Commits entries up to the given index.
- *
- * @param index The index up to which to commit entries.
- */
- void setCommitIndex(long index) {
- this.commitIndex = index;
- }
-
- /**
- * Returns the Raft log commit index.
- *
- * @return The Raft log commit index.
- */
- long getCommitIndex() {
- return commitIndex;
- }
-
- /**
- * Raft log builder.
- */
- public static final class Builder<E> {
- private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
- private static final String DEFAULT_NAME = "atomix";
- private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
- private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
- private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
- private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
- private static final double DEFAULT_INDEX_DENSITY = .005;
-
- private String name = DEFAULT_NAME;
- private StorageLevel storageLevel = StorageLevel.DISK;
- private File directory = new File(DEFAULT_DIRECTORY);
- private JournalSerdes namespace;
- private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
- private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
- private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
- private double indexDensity = DEFAULT_INDEX_DENSITY;
- private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
-
- protected Builder() {
+ @Override
+ public JournalReader<E> openReader(final long index) {
+ return openReader(index, JournalReader.Mode.ALL);
}
/**
- * Sets the storage name.
+ * Opens a new journal reader with the given reader mode.
*
- * @param name The storage name.
- * @return The storage builder.
+ * @param index The index from which to begin reading entries.
+ * @param mode The mode in which to read entries.
+ * @return The journal reader.
*/
- public Builder<E> withName(String name) {
- this.name = requireNonNull(name, "name cannot be null");
- return this;
+ @Override
+ public JournalReader<E> openReader(final long index, final JournalReader.Mode mode) {
+ final var byteReader = switch (mode) {
+ case ALL -> journal.openReader(index);
+ case COMMITS -> journal.openCommitsReader(index);
+ };
+ return new SegmentedJournalReader<>(byteReader, readMapper);
}
- /**
- * Sets the log storage level, returning the builder for method chaining.
- * <p>
- * The storage level indicates how individual entries should be persisted in the journal.
- *
- * @param storageLevel The log storage level.
- * @return The storage builder.
- */
- public Builder<E> withStorageLevel(StorageLevel storageLevel) {
- this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
- return this;
- }
-
- /**
- * Sets the log directory, returning the builder for method chaining.
- * <p>
- * The log will write segment files into the provided directory.
- *
- * @param directory The log directory.
- * @return The storage builder.
- * @throws NullPointerException If the {@code directory} is {@code null}
- */
- public Builder<E> withDirectory(String directory) {
- return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
- }
-
- /**
- * Sets the log directory, returning the builder for method chaining.
- * <p>
- * The log will write segment files into the provided directory.
- *
- * @param directory The log directory.
- * @return The storage builder.
- * @throws NullPointerException If the {@code directory} is {@code null}
- */
- public Builder<E> withDirectory(File directory) {
- this.directory = requireNonNull(directory, "directory cannot be null");
- return this;
+ @Override
+ public void compact(final long index) {
+ journal.compact(index);
}
- /**
- * Sets the journal namespace, returning the builder for method chaining.
- *
- * @param namespace The journal serializer.
- * @return The journal builder.
- */
- public Builder<E> withNamespace(JournalSerdes namespace) {
- this.namespace = requireNonNull(namespace, "namespace cannot be null");
- return this;
+ @Override
+ public void close() {
+ journal.close();
}
- /**
- * Sets the maximum segment size in bytes, returning the builder for method chaining.
- * <p>
- * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
- * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
- * segment and append new entries to that segment.
- * <p>
- * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
- *
- * @param maxSegmentSize The maximum segment size in bytes.
- * @return The storage builder.
- * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
- */
- public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
- checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
- this.maxSegmentSize = maxSegmentSize;
- return this;
- }
-
- /**
- * Sets the maximum entry size in bytes, returning the builder for method chaining.
- *
- * @param maxEntrySize the maximum entry size in bytes
- * @return the storage builder
- * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
- */
- public Builder<E> withMaxEntrySize(int maxEntrySize) {
- checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
- this.maxEntrySize = maxEntrySize;
- return this;
- }
-
- /**
- * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
- * <p>
- * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
- * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
- * new segment and append new entries to that segment.
- * <p>
- * By default, the maximum entries per segment is {@code 1024 * 1024}.
- *
- * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
- * @return The storage builder.
- * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
- * per segment
- * @deprecated since 3.0.2
- */
- @Deprecated
- public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
- checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
- checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
- "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
- this.maxEntriesPerSegment = maxEntriesPerSegment;
- return this;
- }
-
- /**
- * Sets the journal index density.
- * <p>
- * The index density is the frequency at which the position of entries written to the journal will be recorded in an
- * in-memory index for faster seeking.
- *
- * @param indexDensity the index density
- * @return the journal builder
- * @throws IllegalArgumentException if the density is not between 0 and 1
- */
- public Builder<E> withIndexDensity(double indexDensity) {
- checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
- this.indexDensity = indexDensity;
- return this;
- }
-
- /**
- * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
- * chaining.
- * <p>
- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
- * committed in a given segment.
- *
- * @return The storage builder.
- */
- public Builder<E> withFlushOnCommit() {
- return withFlushOnCommit(true);
- }
-
- /**
- * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
- * chaining.
- * <p>
- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
- * committed in a given segment.
- *
- * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
- * @return The storage builder.
- */
- public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
- this.flushOnCommit = flushOnCommit;
- return this;
- }
-
- /**
- * Build the {@link SegmentedJournal}.
- *
- * @return A new {@link SegmentedJournal}.
- */
- public SegmentedJournal<E> build() {
- return new SegmentedJournal<>(
- name,
- storageLevel,
- directory,
- namespace,
- maxSegmentSize,
- maxEntrySize,
- maxEntriesPerSegment,
- indexDensity,
- flushOnCommit);
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("journal", journal).toString();
}
- }
}