2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
22 import java.util.concurrent.atomic.AtomicBoolean;
27 public final class SegmentedJournal<E> implements Journal<E> {
28 private final AtomicBoolean open = new AtomicBoolean(true);
29 private final SegmentedByteBufJournal journal;
30 private final SegmentedJournalWriter<E> writer;
31 private final ByteBufMapper<E> mapper;
33 public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper<E> mapper) {
34 this.journal = requireNonNull(journal, "journal is required");
35 this.mapper = requireNonNull(mapper, "mapper cannot be null");
36 writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
40 public JournalWriter<E> writer() {
45 public JournalReader<E> openReader(final long index) {
46 return openReader(index, JournalReader.Mode.ALL);
50 * Opens a new journal reader with the given reader mode.
52 * @param index The index from which to begin reading entries.
53 * @param mode The mode in which to read entries.
54 * @return The journal reader.
57 public JournalReader<E> openReader(final long index, final JournalReader.Mode mode) {
58 final var byteReader = switch (mode) {
59 case ALL -> journal.openReader(index);
60 case COMMITS -> journal.openCommitsReader(index);
62 return new SegmentedJournalReader<>(byteReader, mapper);
66 public boolean isOpen() {
72 if (open.compareAndExchange(true, false)) {
78 * Compacts the journal up to the given index.
80 * The semantics of compaction are not specified by this interface.
82 * @param index The index up to which to compact the journal.
84 public void compact(final long index) {
85 journal.compact(index);
89 * Returns a new segmented journal builder.
91 * @return A new segmented journal builder.
93 public static <E> Builder<E> builder() {
94 return new Builder<>();
97 public static final class Builder<E> {
98 private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder();
99 private ByteBufMapper<E> mapper;
106 * Sets the journal name.
108 * @param name The journal name.
109 * @return The journal builder.
111 public Builder<E> withName(final String name) {
112 byteJournalBuilder.withName(name);
117 * Sets the journal storage level.
119 * The storage level indicates how individual entries will be persisted in the journal.
121 * @param storageLevel The log storage level.
122 * @return The journal builder.
124 public Builder<E> withStorageLevel(final StorageLevel storageLevel) {
125 byteJournalBuilder.withStorageLevel(storageLevel);
130 * Sets the journal storage directory.
132 * The journal will write segment files into the provided directory.
134 * @param directory The journal storage directory.
135 * @return The journal builder.
136 * @throws NullPointerException If the {@code directory} is {@code null}
138 public Builder<E> withDirectory(final String directory) {
139 byteJournalBuilder.withDirectory(directory);
144 * Sets the journal storage directory.
146 * The journal will write segment files into the provided directory.
148 * @param directory The journal storage directory.
149 * @return The journal builder.
150 * @throws NullPointerException If the {@code directory} is {@code null}
152 public Builder<E> withDirectory(final File directory) {
153 byteJournalBuilder.withDirectory(directory);
158 * Sets the journal namespace.
160 * @param namespace The journal serializer.
161 * @return The journal builder.
162 * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead
164 @Deprecated(forRemoval = true, since="9.0.3")
165 public Builder<E> withNamespace(final JournalSerdes namespace) {
166 return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper());
170 * Sets journal serializer.
172 * @param mapper Journal serializer
173 * @return The journal builder
175 public Builder<E> withMapper(final ByteBufMapper<E> mapper) {
176 this.mapper = requireNonNull(mapper);
181 * Sets the maximum segment size in bytes.
183 * The maximum segment size dictates when journal should roll over to new segments. As entries are written
184 * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the
185 * journal will create a new segment and append new entries to that segment.
187 * By default, the maximum segment size is 32M.
189 * @param maxSegmentSize The maximum segment size in bytes.
190 * @return The storage builder.
191 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
193 public Builder<E> withMaxSegmentSize(final int maxSegmentSize) {
194 byteJournalBuilder.withMaxSegmentSize(maxSegmentSize);
199 * Sets the maximum entry size in bytes.
201 * @param maxEntrySize the maximum entry size in bytes
202 * @return the storage builder
203 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
205 public Builder<E> withMaxEntrySize(final int maxEntrySize) {
206 byteJournalBuilder.withMaxEntrySize(maxEntrySize);
211 * Sets the maximum number of entries per segment.
213 * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
214 * @return The journal builder.
215 * @deprecated since 3.0.2, no longer used
218 public Builder<E> withMaxEntriesPerSegment(final int maxEntriesPerSegment) {
224 * Sets the journal index density.
226 * The index density is the frequency at which the position of entries written to the journal will be recorded
227 * in an in-memory index for faster seeking.
229 * @param indexDensity the index density
230 * @return the journal builder
231 * @throws IllegalArgumentException if the density is not between 0 and 1
233 public Builder<E> withIndexDensity(final double indexDensity) {
234 byteJournalBuilder.withIndexDensity(indexDensity);
239 * Enables flushing buffers to disk when entries are committed to a segment.
241 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
242 * entry is committed in a given segment.
244 * @return The journal builder.
246 public Builder<E> withFlushOnCommit() {
247 return withFlushOnCommit(true);
251 * Enables flushing buffers to disk when entries are committed to a segment.
253 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
254 * entry is committed in a given segment.
256 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
257 * @return The journal builder.
259 public Builder<E> withFlushOnCommit(final boolean flushOnCommit) {
260 byteJournalBuilder.withFlushOnCommit(flushOnCommit);
265 * Build the {@link SegmentedJournal}.
267 * @return {@link SegmentedJournal} instance.
269 public SegmentedJournal<E> build() {
270 return new SegmentedJournal<>(byteJournalBuilder.build(), mapper);