Refactor Journal interface
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 import java.io.File;
22
23 /**
24  * Segmented journal.
25  */
26 public final class SegmentedJournal<E> implements Journal<E> {
27     private final SegmentedByteBufJournal journal;
28     private final SegmentedJournalWriter<E> writer;
29     private final ByteBufMapper<E> mapper;
30
31     public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper<E> mapper) {
32         this.journal = requireNonNull(journal, "journal is required");
33         this.mapper = requireNonNull(mapper, "mapper cannot be null");
34         writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
35     }
36
37     @Override
38     public JournalWriter<E> writer() {
39         return writer;
40     }
41
42     @Override
43     public JournalReader<E> openReader(final long index) {
44         return openReader(index, JournalReader.Mode.ALL);
45     }
46
47     /**
48      * Opens a new journal reader with the given reader mode.
49      *
50      * @param index The index from which to begin reading entries.
51      * @param mode The mode in which to read entries.
52      * @return The journal reader.
53      */
54     @Override
55     public JournalReader<E> openReader(final long index, final JournalReader.Mode mode) {
56         final var byteReader = switch (mode) {
57             case ALL -> journal.openReader(index);
58             case COMMITS -> journal.openCommitsReader(index);
59         };
60         return new SegmentedJournalReader<>(byteReader, mapper);
61     }
62
63     @Override
64     public void close() {
65         journal.close();
66     }
67
68     /**
69      * Compacts the journal up to the given index.
70      * <p>
71      * The semantics of compaction are not specified by this interface.
72      *
73      * @param index The index up to which to compact the journal.
74      */
75     public void compact(final long index) {
76         journal.compact(index);
77     }
78
79     /**
80      * Returns a new segmented journal builder.
81      *
82      * @return A new segmented journal builder.
83      */
84     public static <E> Builder<E> builder() {
85         return new Builder<>();
86     }
87
88     public static final class Builder<E> {
89         private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder();
90         private ByteBufMapper<E> mapper;
91
92         private Builder() {
93             // on purpose
94         }
95
96         /**
97          * Sets the journal name.
98          *
99          * @param name The journal name.
100          * @return The journal builder.
101          */
102         public Builder<E> withName(final String name) {
103             byteJournalBuilder.withName(name);
104             return this;
105         }
106
107         /**
108          * Sets the journal storage level.
109          * <p>
110          * The storage level indicates how individual entries will be persisted in the journal.
111          *
112          * @param storageLevel The log storage level.
113          * @return The journal builder.
114          */
115         public Builder<E> withStorageLevel(final StorageLevel storageLevel) {
116             byteJournalBuilder.withStorageLevel(storageLevel);
117             return this;
118         }
119
120         /**
121          * Sets the journal storage directory.
122          * <p>
123          * The journal will write segment files into the provided directory.
124          *
125          * @param directory The journal storage directory.
126          * @return The journal builder.
127          * @throws NullPointerException If the {@code directory} is {@code null}
128          */
129         public Builder<E> withDirectory(final String directory) {
130             byteJournalBuilder.withDirectory(directory);
131             return this;
132         }
133
134         /**
135          * Sets the journal storage directory.
136          * <p>
137          * The journal will write segment files into the provided directory.
138          *
139          * @param directory The journal storage directory.
140          * @return The journal builder.
141          * @throws NullPointerException If the {@code directory} is {@code null}
142          */
143         public Builder<E> withDirectory(final File directory) {
144              byteJournalBuilder.withDirectory(directory);
145             return this;
146         }
147
148         /**
149          * Sets the journal namespace.
150          *
151          * @param namespace The journal serializer.
152          * @return The journal builder.
153          * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead
154          */
155         @Deprecated(forRemoval = true, since="9.0.3")
156         public Builder<E> withNamespace(final JournalSerdes namespace) {
157             return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper());
158         }
159
160         /**
161          * Sets journal serializer.
162          *
163          * @param mapper Journal serializer
164          * @return The journal builder
165          */
166         public Builder<E> withMapper(final ByteBufMapper<E> mapper) {
167             this.mapper = requireNonNull(mapper);
168             return this;
169         }
170
171         /**
172          * Sets the maximum segment size in bytes.
173          * <p>
174          * The maximum segment size dictates when journal should roll over to new segments. As entries are written
175          * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the
176          * journal will create a new segment and append new entries to that segment.
177          * <p>
178          * By default, the maximum segment size is 32M.
179          *
180          * @param maxSegmentSize The maximum segment size in bytes.
181          * @return The storage builder.
182          * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
183          */
184         public Builder<E> withMaxSegmentSize(final int maxSegmentSize) {
185             byteJournalBuilder.withMaxSegmentSize(maxSegmentSize);
186             return this;
187         }
188
189         /**
190          * Sets the maximum entry size in bytes.
191          *
192          * @param maxEntrySize the maximum entry size in bytes
193          * @return the storage builder
194          * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
195          */
196         public Builder<E> withMaxEntrySize(final int maxEntrySize) {
197             byteJournalBuilder.withMaxEntrySize(maxEntrySize);
198             return this;
199         }
200
201         /**
202          * Sets the maximum number of entries per segment.
203          *
204          * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
205          * @return The journal builder.
206          * @deprecated since 3.0.2, no longer used
207          */
208         @Deprecated
209         public Builder<E> withMaxEntriesPerSegment(final int maxEntriesPerSegment) {
210             // ignore
211             return this;
212         }
213
214         /**
215          * Sets the journal index density.
216          * <p>
217          * The index density is the frequency at which the position of entries written to the journal will be recorded
218          * in an in-memory index for faster seeking.
219          *
220          * @param indexDensity the index density
221          * @return the journal builder
222          * @throws IllegalArgumentException if the density is not between 0 and 1
223          */
224         public Builder<E> withIndexDensity(final double indexDensity) {
225             byteJournalBuilder.withIndexDensity(indexDensity);
226             return this;
227         }
228
229         /**
230          * Enables flushing buffers to disk when entries are committed to a segment.
231          * <p>
232          * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
233          * entry is committed in a given segment.
234          *
235          * @return The journal builder.
236          */
237         public Builder<E> withFlushOnCommit() {
238             return withFlushOnCommit(true);
239         }
240
241         /**
242          * Enables flushing buffers to disk when entries are committed to a segment.
243          * <p>
244          * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
245          * entry is committed in a given segment.
246          *
247          * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
248          * @return The journal builder.
249          */
250         public Builder<E> withFlushOnCommit(final boolean flushOnCommit) {
251             byteJournalBuilder.withFlushOnCommit(flushOnCommit);
252             return this;
253         }
254
255         /**
256          * Build the {@link SegmentedJournal}.
257          *
258          * @return {@link SegmentedJournal} instance.
259          */
260         public SegmentedJournal<E> build() {
261             return new SegmentedJournal<>(byteJournalBuilder.build(), mapper);
262         }
263     }
264 }