Separate byte-level atomic-storage access
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedByteBufJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkState;
21 import static java.util.Objects.requireNonNull;
22
23 import java.io.File;
24 import java.io.IOException;
25 import java.util.Collection;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentNavigableMap;
29 import java.util.concurrent.ConcurrentSkipListMap;
30 import java.util.function.BiFunction;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * A {@link ByteBufJournal} Implementation.
38  */
39 public final class SegmentedByteBufJournal implements ByteBufJournal {
40     private static final Logger LOG = LoggerFactory.getLogger(SegmentedByteBufJournal.class);
41     private static final int SEGMENT_BUFFER_FACTOR = 3;
42
43     private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
44     private final Collection<ByteBufReader> readers = ConcurrentHashMap.newKeySet();
45     private final String name;
46     private final StorageLevel storageLevel;
47     private final File directory;
48     private final int maxSegmentSize;
49     private final int maxEntrySize;
50     private final double indexDensity;
51     private final boolean flushOnCommit;
52     private final @NonNull ByteBufWriter writer;
53
54     private JournalSegment currentSegment;
55     private volatile long commitIndex;
56
57     public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
58             final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) {
59         this.name = requireNonNull(name, "name cannot be null");
60         this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
61         this.directory = requireNonNull(directory, "directory cannot be null");
62         this.maxSegmentSize = maxSegmentSize;
63         this.maxEntrySize = maxEntrySize;
64         this.indexDensity = indexDensity;
65         this.flushOnCommit = flushOnCommit;
66         open();
67         writer = new SegmentedByteBufWriter(this);
68     }
69
70     /**
71      * Returns the total size of the journal.
72      *
73      * @return the total size of the journal
74      */
75     public long size() {
76         return segments.values().stream()
77             .mapToLong(segment -> {
78                 try {
79                     return segment.file().size();
80                 } catch (IOException e) {
81                     throw new StorageException(e);
82                 }
83             })
84             .sum();
85     }
86
87     @Override
88     public ByteBufWriter writer() {
89         return writer;
90     }
91
92     @Override
93     public ByteBufReader openReader(final long index) {
94         return openReader(index, SegmentedByteBufReader::new);
95     }
96
97     @NonNullByDefault
98     private ByteBufReader openReader(final long index,
99             final BiFunction<SegmentedByteBufJournal, JournalSegment, ByteBufReader> constructor) {
100         final var reader = constructor.apply(this, segment(index));
101         reader.reset(index);
102         readers.add(reader);
103         return reader;
104     }
105
106     @Override
107     public ByteBufReader openCommitsReader(final long index) {
108         return openReader(index, SegmentedCommitsByteBufReader::new);
109     }
110
111     /**
112      * Opens the segments.
113      */
114     private synchronized void open() {
115         // Load existing log segments from disk.
116         for (var segment : loadSegments()) {
117             segments.put(segment.firstIndex(), segment);
118         }
119         // If a segment doesn't already exist, create an initial segment starting at index 1.
120         if (segments.isEmpty()) {
121             currentSegment = createSegment(1, 1);
122             segments.put(1L, currentSegment);
123         }  else {
124             currentSegment = segments.lastEntry().getValue();
125         }
126     }
127
128     /**
129      * Asserts that the manager is open.
130      *
131      * @throws IllegalStateException if the segment manager is not open
132      */
133     private void assertOpen() {
134         checkState(currentSegment != null, "journal not open");
135     }
136
137     /**
138      * Asserts that enough disk space is available to allocate a new segment.
139      */
140     private void assertDiskSpace() {
141         if (directory.getUsableSpace() < maxSegmentSize * SEGMENT_BUFFER_FACTOR) {
142             throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
143         }
144     }
145
146     /**
147      * Resets the current segment, creating a new segment if necessary.
148      */
149     private synchronized void resetCurrentSegment() {
150         final var lastSegment = lastSegment();
151         if (lastSegment == null) {
152             currentSegment = createSegment(1, 1);
153             segments.put(1L, currentSegment);
154         } else {
155             currentSegment = lastSegment;
156         }
157     }
158
159     /**
160      * Resets and returns the first segment in the journal.
161      *
162      * @param index the starting index of the journal
163      * @return the first segment
164      */
165     JournalSegment resetSegments(final long index) {
166         assertOpen();
167
168         // If the index already equals the first segment index, skip the reset.
169         final var firstSegment = firstSegment();
170         if (index == firstSegment.firstIndex()) {
171             return firstSegment;
172         }
173
174         segments.values().forEach(JournalSegment::delete);
175         segments.clear();
176
177         currentSegment = createSegment(1, index);
178         segments.put(index, currentSegment);
179         return currentSegment;
180     }
181
182     /**
183      * Returns the first segment in the log.
184      *
185      * @throws IllegalStateException if the segment manager is not open
186      */
187     JournalSegment firstSegment() {
188         assertOpen();
189         final var firstEntry = segments.firstEntry();
190         return firstEntry != null ? firstEntry.getValue() : nextSegment();
191     }
192
193     /**
194      * Returns the last segment in the log.
195      *
196      * @throws IllegalStateException if the segment manager is not open
197      */
198     JournalSegment lastSegment() {
199         assertOpen();
200         final var lastEntry = segments.lastEntry();
201         return lastEntry != null ? lastEntry.getValue() : nextSegment();
202     }
203
204     /**
205      * Creates and returns the next segment.
206      *
207      * @return The next segment.
208      * @throws IllegalStateException if the segment manager is not open
209      */
210     synchronized JournalSegment nextSegment() {
211         assertOpen();
212         assertDiskSpace();
213
214         final var index = currentSegment.lastIndex() + 1;
215         final var lastSegment = lastSegment();
216         currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index);
217         segments.put(index, currentSegment);
218         return currentSegment;
219     }
220
221     /**
222      * Returns the segment following the segment with the given ID.
223      *
224      * @param index The segment index with which to look up the next segment.
225      * @return The next segment for the given index.
226      */
227     JournalSegment nextSegment(final long index) {
228         final var higherEntry = segments.higherEntry(index);
229         return higherEntry != null ? higherEntry.getValue() : null;
230     }
231
232     /**
233      * Returns the segment for the given index.
234      *
235      * @param index The index for which to return the segment.
236      * @throws IllegalStateException if the segment manager is not open
237      */
238     synchronized JournalSegment segment(final long index) {
239         assertOpen();
240         // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
241         if (currentSegment != null && index > currentSegment.firstIndex()) {
242             return currentSegment;
243         }
244
245         // If the index is in another segment, get the entry with the next lowest first index.
246         final var segment = segments.floorEntry(index);
247         return segment != null ? segment.getValue() : firstSegment();
248     }
249
250     /**
251      * Removes a segment.
252      *
253      * @param segment The segment to remove.
254      */
255     synchronized void removeSegment(final JournalSegment segment) {
256         segments.remove(segment.firstIndex());
257         segment.delete();
258         resetCurrentSegment();
259     }
260
261     /**
262      * Creates a new segment.
263      */
264     JournalSegment createSegment(final long id, final long index) {
265         final JournalSegmentFile file;
266         try {
267             file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
268                 .withId(id)
269                 .withIndex(index)
270                 .withMaxSegmentSize(maxSegmentSize)
271                 // FIXME: propagate maxEntries
272                 .withMaxEntries(Integer.MAX_VALUE)
273                 .withUpdated(System.currentTimeMillis())
274                 .build());
275         } catch (IOException e) {
276             throw new StorageException(e);
277         }
278
279         final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
280         LOG.debug("Created segment: {}", segment);
281         return segment;
282     }
283
284     /**
285      * Loads all segments from disk.
286      *
287      * @return A collection of segments for the log.
288      */
289     protected Collection<JournalSegment> loadSegments() {
290         // Ensure log directories are created.
291         directory.mkdirs();
292
293         final var segmentsMap = new TreeMap<Long, JournalSegment>();
294
295         // Iterate through all files in the log directory.
296         for (var file : directory.listFiles(File::isFile)) {
297
298             // If the file looks like a segment file, attempt to load the segment.
299             if (JournalSegmentFile.isSegmentFile(name, file)) {
300                 final JournalSegmentFile segmentFile;
301                 try {
302                     segmentFile = JournalSegmentFile.openExisting(file.toPath());
303                 } catch (IOException e) {
304                     throw new StorageException(e);
305                 }
306
307                 // Load the segment.
308                 LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path());
309
310                 // Add the segment to the segments list.
311                 final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
312                 segments.put(segment.firstIndex(), segment);
313             }
314         }
315
316         // Verify that all the segments in the log align with one another.
317         JournalSegment previousSegment = null;
318         boolean corrupted = false;
319         for (var iterator =  segmentsMap.entrySet().iterator(); iterator.hasNext();  ) {
320             final var segment = iterator.next().getValue();
321             if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
322                 LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
323                     previousSegment.file().path());
324                 corrupted = true;
325             }
326             if (corrupted) {
327                 segment.delete();
328                 iterator.remove();
329             }
330             previousSegment = segment;
331         }
332
333         return segmentsMap.values();
334     }
335
336     /**
337      * Resets journal readers to the given head.
338      *
339      * @param index The index at which to reset readers.
340      */
341     void resetHead(final long index) {
342         for (var reader : readers) {
343             if (reader.nextIndex() < index) {
344                 reader.reset(index);
345             }
346         }
347     }
348
349     /**
350      * Resets journal readers to the given tail.
351      *
352      * @param index The index at which to reset readers.
353      */
354     void resetTail(final long index) {
355         for (var reader : readers) {
356             if (reader.nextIndex() >= index) {
357                 reader.reset(index);
358             }
359         }
360     }
361
362     void closeReader(final SegmentedByteBufReader reader) {
363         readers.remove(reader);
364     }
365
366     /**
367      * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
368      *
369      * @param index the index from which to remove segments
370      * @return indicates whether a segment can be removed from the journal
371      */
372     public boolean isCompactable(final long index) {
373         final var segmentEntry = segments.floorEntry(index);
374         return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
375     }
376
377     /**
378      * Returns the index of the last segment in the log.
379      *
380      * @param index the compaction index
381      * @return the starting index of the last segment in the log
382      */
383     public long getCompactableIndex(final long index) {
384         final var segmentEntry = segments.floorEntry(index);
385         return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
386     }
387
388     /**
389      * Compacts the journal up to the given index.
390      * <p>
391      * The semantics of compaction are not specified by this interface.
392      *
393      * @param index The index up to which to compact the journal.
394      */
395     public void compact(final long index) {
396         final var segmentEntry = segments.floorEntry(index);
397         if (segmentEntry != null) {
398             final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
399             if (!compactSegments.isEmpty()) {
400                 LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
401                 compactSegments.values().forEach(JournalSegment::delete);
402                 compactSegments.clear();
403                 resetHead(segmentEntry.getValue().firstIndex());
404             }
405         }
406     }
407
408     @Override
409     public void close() {
410         if (currentSegment != null) {
411             currentSegment = null;
412             segments.values().forEach(JournalSegment::close);
413             segments.clear();
414         }
415     }
416
417     /**
418      * Returns whether {@code flushOnCommit} is enabled for the log.
419      *
420      * @return Indicates whether {@code flushOnCommit} is enabled for the log.
421      */
422     boolean isFlushOnCommit() {
423         return flushOnCommit;
424     }
425
426     /**
427      * Updates commit index to the given value.
428      *
429      * @param index The index value.
430      */
431     void setCommitIndex(final long index) {
432         commitIndex = index;
433     }
434
435     /**
436      * Returns the journal last commit index.
437      *
438      * @return The journal last commit index.
439      */
440     long getCommitIndex() {
441         return commitIndex;
442     }
443
444     public static Builder builder() {
445         return new Builder();
446     }
447
448     /**
449      * Segmented byte journal builder.
450      */
451     public static final class Builder {
452         private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
453         private static final String DEFAULT_NAME = "atomix";
454         private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
455         private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
456         private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
457         private static final double DEFAULT_INDEX_DENSITY = .005;
458
459         private String name = DEFAULT_NAME;
460         private StorageLevel storageLevel = StorageLevel.DISK;
461         private File directory = new File(DEFAULT_DIRECTORY);
462         private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
463         private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
464         private double indexDensity = DEFAULT_INDEX_DENSITY;
465         private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
466
467         private Builder() {
468             // on purpose
469         }
470
471         /**
472          * Sets the journal name.
473          *
474          * @param name The journal name.
475          * @return The builder instance
476          */
477         public Builder withName(final String name) {
478             this.name = requireNonNull(name, "name cannot be null");
479             return this;
480         }
481
482         /**
483          * Sets the storage level.
484          *
485          * @param storageLevel The storage level.
486          * @return The builder instance
487          */
488         public Builder withStorageLevel(final StorageLevel storageLevel) {
489             this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
490             return this;
491         }
492
493         /**
494          * Sets the journal directory.
495          *
496          * @param directory The log directory.
497          * @return The builder instance
498          * @throws NullPointerException If the {@code directory} is {@code null}
499          */
500         public Builder withDirectory(final String directory) {
501             return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
502         }
503
504         /**
505          * Sets the journal directory
506          *
507          * @param directory The log directory.
508          * @return The builder instance
509          * @throws NullPointerException If the {@code directory} is {@code null}
510          */
511         public Builder withDirectory(final File directory) {
512             this.directory = requireNonNull(directory, "directory cannot be null");
513             return this;
514         }
515
516         /**
517          * Sets the maximum segment size in bytes.
518          * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
519          *
520          * @param maxSegmentSize The maximum segment size in bytes.
521          * @return The builder instance
522          * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
523          */
524         public Builder withMaxSegmentSize(final int maxSegmentSize) {
525             checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
526                 "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
527             this.maxSegmentSize = maxSegmentSize;
528             return this;
529         }
530
531         /**
532          * Sets the maximum entry size in bytes.
533          *
534          * @param maxEntrySize the maximum entry size in bytes
535          * @return the builder instance
536          * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
537          */
538         public Builder withMaxEntrySize(final int maxEntrySize) {
539             checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
540             this.maxEntrySize = maxEntrySize;
541             return this;
542         }
543
544         /**
545          * Sets the journal index density.
546          * <p>
547          * The index density is the frequency at which the position of entries written to the journal will be
548          * recorded in an in-memory index for faster seeking.
549          *
550          * @param indexDensity the index density
551          * @return the builder instance
552          * @throws IllegalArgumentException if the density is not between 0 and 1
553          */
554         public Builder withIndexDensity(final double indexDensity) {
555             checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
556             this.indexDensity = indexDensity;
557             return this;
558         }
559
560         /**
561          * Enables flushing buffers to disk when entries are committed to a segment.
562          * <p>
563          * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
564          * an entry is committed in a given segment.
565          *
566          * @return The builder instance
567          */
568         public Builder withFlushOnCommit() {
569             return withFlushOnCommit(true);
570         }
571
572         /**
573          * Sets whether to flush buffers to disk when entries are committed to a segment.
574          * <p>
575          * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
576          * an entry is committed in a given segment.
577          *
578          * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
579          * @return The builder instance
580          */
581         public Builder withFlushOnCommit(final boolean flushOnCommit) {
582             this.flushOnCommit = flushOnCommit;
583             return this;
584         }
585
586         /**
587          * Build the {@link SegmentedByteBufJournal}.
588          *
589          * @return {@link SegmentedByteBufJournal} instance built.
590          */
591         public SegmentedByteBufJournal build() {
592             return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize,
593                 indexDensity, flushOnCommit);
594         }
595     }
596 }