Separate byte-level atomic-storage access
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 import java.io.File;
22 import java.util.concurrent.atomic.AtomicBoolean;
23
24 /**
25  * Segmented journal.
26  */
27 public final class SegmentedJournal<E> implements Journal<E> {
28     private final AtomicBoolean open = new AtomicBoolean(true);
29     private final SegmentedByteBufJournal journal;
30     private final SegmentedJournalWriter<E> writer;
31     private final ByteBufMapper<E> mapper;
32
33     public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper<E> mapper) {
34         this.journal = requireNonNull(journal, "journal is required");
35         this.mapper = requireNonNull(mapper, "mapper cannot be null");
36         writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
37     }
38
39     @Override
40     public JournalWriter<E> writer() {
41         return writer;
42     }
43
44     @Override
45     public JournalReader<E> openReader(final long index) {
46         return openReader(index, JournalReader.Mode.ALL);
47     }
48
49     /**
50      * Opens a new journal reader with the given reader mode.
51      *
52      * @param index The index from which to begin reading entries.
53      * @param mode The mode in which to read entries.
54      * @return The journal reader.
55      */
56     @Override
57     public JournalReader<E> openReader(final long index, final JournalReader.Mode mode) {
58         final var byteReader = switch (mode) {
59             case ALL -> journal.openReader(index);
60             case COMMITS -> journal.openCommitsReader(index);
61         };
62         return new SegmentedJournalReader<>(byteReader, mapper);
63     }
64
65     @Override
66     public boolean isOpen() {
67         return open.get();
68     }
69
70     @Override
71     public void close() {
72         if (open.compareAndExchange(true, false)) {
73             journal.close();
74         }
75     }
76
77     /**
78      * Compacts the journal up to the given index.
79      * <p>
80      * The semantics of compaction are not specified by this interface.
81      *
82      * @param index The index up to which to compact the journal.
83      */
84     public void compact(final long index) {
85         journal.compact(index);
86     }
87
88     /**
89      * Returns a new segmented journal builder.
90      *
91      * @return A new segmented journal builder.
92      */
93     public static <E> Builder<E> builder() {
94         return new Builder<>();
95     }
96
97     public static final class Builder<E> {
98         private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder();
99         private ByteBufMapper<E> mapper;
100
101         private Builder() {
102             // on purpose
103         }
104
105         /**
106          * Sets the journal name.
107          *
108          * @param name The journal name.
109          * @return The journal builder.
110          */
111         public Builder<E> withName(final String name) {
112             byteJournalBuilder.withName(name);
113             return this;
114         }
115
116         /**
117          * Sets the journal storage level.
118          * <p>
119          * The storage level indicates how individual entries will be persisted in the journal.
120          *
121          * @param storageLevel The log storage level.
122          * @return The journal builder.
123          */
124         public Builder<E> withStorageLevel(final StorageLevel storageLevel) {
125             byteJournalBuilder.withStorageLevel(storageLevel);
126             return this;
127         }
128
129         /**
130          * Sets the journal storage directory.
131          * <p>
132          * The journal will write segment files into the provided directory.
133          *
134          * @param directory The journal storage directory.
135          * @return The journal builder.
136          * @throws NullPointerException If the {@code directory} is {@code null}
137          */
138         public Builder<E> withDirectory(final String directory) {
139             byteJournalBuilder.withDirectory(directory);
140             return this;
141         }
142
143         /**
144          * Sets the journal storage directory.
145          * <p>
146          * The journal will write segment files into the provided directory.
147          *
148          * @param directory The journal storage directory.
149          * @return The journal builder.
150          * @throws NullPointerException If the {@code directory} is {@code null}
151          */
152         public Builder<E> withDirectory(final File directory) {
153              byteJournalBuilder.withDirectory(directory);
154             return this;
155         }
156
157         /**
158          * Sets the journal namespace.
159          *
160          * @param namespace The journal serializer.
161          * @return The journal builder.
162          * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead
163          */
164         @Deprecated(forRemoval = true, since="9.0.3")
165         public Builder<E> withNamespace(final JournalSerdes namespace) {
166             return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper());
167         }
168
169         /**
170          * Sets journal serializer.
171          *
172          * @param mapper Journal serializer
173          * @return The journal builder
174          */
175         public Builder<E> withMapper(final ByteBufMapper<E> mapper) {
176             this.mapper = requireNonNull(mapper);
177             return this;
178         }
179
180         /**
181          * Sets the maximum segment size in bytes.
182          * <p>
183          * The maximum segment size dictates when journal should roll over to new segments. As entries are written
184          * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the
185          * journal will create a new segment and append new entries to that segment.
186          * <p>
187          * By default, the maximum segment size is 32M.
188          *
189          * @param maxSegmentSize The maximum segment size in bytes.
190          * @return The storage builder.
191          * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
192          */
193         public Builder<E> withMaxSegmentSize(final int maxSegmentSize) {
194             byteJournalBuilder.withMaxSegmentSize(maxSegmentSize);
195             return this;
196         }
197
198         /**
199          * Sets the maximum entry size in bytes.
200          *
201          * @param maxEntrySize the maximum entry size in bytes
202          * @return the storage builder
203          * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
204          */
205         public Builder<E> withMaxEntrySize(final int maxEntrySize) {
206             byteJournalBuilder.withMaxEntrySize(maxEntrySize);
207             return this;
208         }
209
210         /**
211          * Sets the maximum number of entries per segment.
212          *
213          * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
214          * @return The journal builder.
215          * @deprecated since 3.0.2, no longer used
216          */
217         @Deprecated
218         public Builder<E> withMaxEntriesPerSegment(final int maxEntriesPerSegment) {
219             // ignore
220             return this;
221         }
222
223         /**
224          * Sets the journal index density.
225          * <p>
226          * The index density is the frequency at which the position of entries written to the journal will be recorded
227          * in an in-memory index for faster seeking.
228          *
229          * @param indexDensity the index density
230          * @return the journal builder
231          * @throws IllegalArgumentException if the density is not between 0 and 1
232          */
233         public Builder<E> withIndexDensity(final double indexDensity) {
234             byteJournalBuilder.withIndexDensity(indexDensity);
235             return this;
236         }
237
238         /**
239          * Enables flushing buffers to disk when entries are committed to a segment.
240          * <p>
241          * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
242          * entry is committed in a given segment.
243          *
244          * @return The journal builder.
245          */
246         public Builder<E> withFlushOnCommit() {
247             return withFlushOnCommit(true);
248         }
249
250         /**
251          * Enables flushing buffers to disk when entries are committed to a segment.
252          * <p>
253          * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
254          * entry is committed in a given segment.
255          *
256          * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
257          * @return The journal builder.
258          */
259         public Builder<E> withFlushOnCommit(final boolean flushOnCommit) {
260             byteJournalBuilder.withFlushOnCommit(flushOnCommit);
261             return this;
262         }
263
264         /**
265          * Build the {@link SegmentedJournal}.
266          *
267          * @return {@link SegmentedJournal} instance.
268          */
269         public SegmentedJournal<E> build() {
270             return new SegmentedJournal<>(byteJournalBuilder.build(), mapper);
271         }
272     }
273 }