0d42181851f5c4d7e587e64180e65ba97f35baed
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedByteBufJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkState;
21 import static java.util.Objects.requireNonNull;
22
23 import java.io.File;
24 import java.io.IOException;
25 import java.util.Collection;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentNavigableMap;
29 import java.util.concurrent.ConcurrentSkipListMap;
30 import java.util.function.BiFunction;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * A {@link ByteBufJournal} Implementation.
39  */
40 public final class SegmentedByteBufJournal implements ByteBufJournal {
41     private static final Logger LOG = LoggerFactory.getLogger(SegmentedByteBufJournal.class);
42     private static final int SEGMENT_BUFFER_FACTOR = 3;
43
44     private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
45     private final Collection<ByteBufReader> readers = ConcurrentHashMap.newKeySet();
46     private final String name;
47     private final StorageLevel storageLevel;
48     private final File directory;
49     private final int maxSegmentSize;
50     private final int maxEntrySize;
51     private final double indexDensity;
52     private final boolean flushOnCommit;
53     private final @NonNull ByteBufWriter writer;
54
55     // null when closed
56     private JournalSegment currentSegment;
57     private volatile long commitIndex;
58
59     public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
60             final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) {
61         this.name = requireNonNull(name, "name cannot be null");
62         this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
63         this.directory = requireNonNull(directory, "directory cannot be null");
64         this.maxSegmentSize = maxSegmentSize;
65         this.maxEntrySize = maxEntrySize;
66         this.indexDensity = indexDensity;
67         this.flushOnCommit = flushOnCommit;
68
69         // Load existing log segments from disk.
70         for (var segment : loadSegments()) {
71             segments.put(segment.firstIndex(), segment);
72         }
73         currentSegment = ensureLastSegment();
74
75         writer = new SegmentedByteBufWriter(this);
76     }
77
78     /**
79      * Returns the total size of the journal.
80      *
81      * @return the total size of the journal
82      */
83     public long size() {
84         return segments.values().stream()
85             .mapToLong(segment -> {
86                 try {
87                     return segment.file().size();
88                 } catch (IOException e) {
89                     throw new StorageException(e);
90                 }
91             })
92             .sum();
93     }
94
95     @Override
96     public long lastIndex() {
97         return lastSegment().lastIndex();
98     }
99
100     @Override
101     public ByteBufWriter writer() {
102         return writer;
103     }
104
105     @Override
106     public ByteBufReader openReader(final long index) {
107         return openReader(index, SegmentedByteBufReader::new);
108     }
109
110     @NonNullByDefault
111     private ByteBufReader openReader(final long index,
112             final BiFunction<SegmentedByteBufJournal, JournalSegment, ByteBufReader> constructor) {
113         final var reader = constructor.apply(this, segment(index));
114         reader.reset(index);
115         readers.add(reader);
116         return reader;
117     }
118
119     @Override
120     public ByteBufReader openCommitsReader(final long index) {
121         return openReader(index, SegmentedCommitsByteBufReader::new);
122     }
123
124     /**
125      * Asserts that the manager is open.
126      *
127      * @throws IllegalStateException if the segment manager is not open
128      */
129     private void assertOpen() {
130         checkState(currentSegment != null, "journal not open");
131     }
132
133     /**
134      * Asserts that enough disk space is available to allocate a new segment.
135      */
136     private void assertDiskSpace() {
137         if (directory.getUsableSpace() < maxSegmentSize * SEGMENT_BUFFER_FACTOR) {
138             throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
139         }
140     }
141
142     /**
143      * Resets and returns the first segment in the journal.
144      *
145      * @param index the starting index of the journal
146      * @return the first segment
147      */
148     JournalSegment resetSegments(final long index) {
149         assertOpen();
150
151         // If the index already equals the first segment index, skip the reset.
152         final var firstSegment = firstSegment();
153         if (index == firstSegment.firstIndex()) {
154             return firstSegment;
155         }
156
157         segments.values().forEach(JournalSegment::delete);
158         segments.clear();
159         final var newSegment = createInitialSegment();
160         currentSegment = newSegment;
161         return newSegment;
162     }
163
164     /**
165      * Returns the first segment in the log.
166      *
167      * @throws IllegalStateException if the segment manager is not open
168      */
169     JournalSegment firstSegment() {
170         assertOpen();
171         return segments.firstEntry().getValue();
172     }
173
174     /**
175      * Returns the last segment in the log.
176      *
177      * @throws IllegalStateException if the segment manager is not open
178      */
179     JournalSegment lastSegment() {
180         assertOpen();
181         return segments.lastEntry().getValue();
182     }
183
184     /**
185      * Returns the segment following the segment with the given ID.
186      *
187      * @param index The segment index with which to look up the next segment.
188      * @return The next segment for the given index, or {@code null} if no such segment exists
189      */
190     @Nullable JournalSegment tryNextSegment(final long index) {
191         final var higherEntry = segments.higherEntry(index);
192         return higherEntry != null ? higherEntry.getValue() : null;
193     }
194
195     /**
196      * Creates and returns the next segment.
197      *
198      * @return The next segment.
199      * @throws IllegalStateException if the segment manager is not open
200      */
201     synchronized @NonNull JournalSegment createNextSegment() {
202         assertOpen();
203         assertDiskSpace();
204
205         // FIXME: lastSegment should equal currentSegment. We should be asserting that.
206         final var index = currentSegment.lastIndex() + 1;
207         final var lastSegment = lastSegment();
208         final var nextSegment = createSegment(lastSegment.file().segmentId() + 1, index);
209         segments.put(index, nextSegment);
210         currentSegment = nextSegment;
211         return nextSegment;
212     }
213
214     /**
215      * Returns the segment for the given index.
216      *
217      * @param index The index for which to return the segment.
218      * @throws IllegalStateException if the segment manager is not open
219      */
220     synchronized JournalSegment segment(final long index) {
221         assertOpen();
222         // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
223         if (currentSegment != null && index > currentSegment.firstIndex()) {
224             return currentSegment;
225         }
226
227         // If the index is in another segment, get the entry with the next lowest first index.
228         final var segment = segments.floorEntry(index);
229         return segment != null ? segment.getValue() : firstSegment();
230     }
231
232     /**
233      * Removes a segment.
234      *
235      * @param segment The segment to remove.
236      */
237     synchronized void removeSegment(final JournalSegment segment) {
238         segments.remove(segment.firstIndex());
239         segment.delete();
240
241         // Reset current segment to last segment
242         currentSegment = ensureLastSegment();
243     }
244
245     /**
246      * Creates a new segment.
247      *
248      * @param segmentId the segment ID
249      * @param firstIndex index of first entry
250      * @param A new segment
251      */
252     private @NonNull JournalSegment createSegment(final long segmentId, final long firstIndex) {
253         final JournalSegmentFile file;
254         try {
255             file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
256                 .withId(segmentId)
257                 .withIndex(firstIndex)
258                 .withMaxSegmentSize(maxSegmentSize)
259                 // FIXME: propagate maxEntries
260                 .withMaxEntries(Integer.MAX_VALUE)
261                 .withUpdated(System.currentTimeMillis())
262                 .build());
263         } catch (IOException e) {
264             throw new StorageException(e);
265         }
266
267         final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
268         LOG.debug("Created segment: {}", segment);
269         return segment;
270     }
271
272     private @NonNull JournalSegment createInitialSegment() {
273         final var segment = createSegment(1, 1);
274         segments.put(1L, segment);
275         return segment;
276     }
277
278     /**
279      * Make sure there is a last segment and return it.
280      *
281      * @return the last segment
282      */
283     private JournalSegment ensureLastSegment() {
284         final var lastEntry = segments.lastEntry();
285         // if there is no segment, create an initial segment starting at index 1.
286         return lastEntry != null ? lastEntry.getValue() : createInitialSegment();
287     }
288
289     /**
290      * Loads all segments from disk.
291      *
292      * @return A collection of segments for the log.
293      */
294     private Collection<JournalSegment> loadSegments() {
295         // Ensure log directories are created.
296         directory.mkdirs();
297
298         final var segmentsMap = new TreeMap<Long, JournalSegment>();
299
300         // Iterate through all files in the log directory.
301         for (var file : directory.listFiles(File::isFile)) {
302
303             // If the file looks like a segment file, attempt to load the segment.
304             if (JournalSegmentFile.isSegmentFile(name, file)) {
305                 final JournalSegmentFile segmentFile;
306                 try {
307                     segmentFile = JournalSegmentFile.openExisting(file.toPath());
308                 } catch (IOException e) {
309                     throw new StorageException(e);
310                 }
311
312                 // Load the segment.
313                 LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path());
314
315                 // Add the segment to the segments list.
316                 final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
317                 segmentsMap.put(segment.firstIndex(), segment);
318             }
319         }
320
321         // Verify that all the segments in the log align with one another.
322         JournalSegment previousSegment = null;
323         boolean corrupted = false;
324         for (var iterator =  segmentsMap.entrySet().iterator(); iterator.hasNext();  ) {
325             final var segment = iterator.next().getValue();
326             if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
327                 LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
328                     previousSegment.file().path());
329                 corrupted = true;
330             }
331             if (corrupted) {
332                 segment.delete();
333                 iterator.remove();
334             }
335             previousSegment = segment;
336         }
337
338         return segmentsMap.values();
339     }
340
341     /**
342      * Resets journal readers to the given head.
343      *
344      * @param index The index at which to reset readers.
345      */
346     void resetHead(final long index) {
347         for (var reader : readers) {
348             if (reader.nextIndex() < index) {
349                 reader.reset(index);
350             }
351         }
352     }
353
354     /**
355      * Resets journal readers to the given tail.
356      *
357      * @param index The index at which to reset readers.
358      */
359     void resetTail(final long index) {
360         for (var reader : readers) {
361             if (reader.nextIndex() >= index) {
362                 reader.reset(index);
363             }
364         }
365     }
366
367     void closeReader(final SegmentedByteBufReader reader) {
368         readers.remove(reader);
369     }
370
371     /**
372      * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
373      *
374      * @param index the index from which to remove segments
375      * @return indicates whether a segment can be removed from the journal
376      */
377     public boolean isCompactable(final long index) {
378         final var firstIndex = getCompactableIndex(index);
379         return firstIndex != 0 && !segments.headMap(firstIndex).isEmpty();
380     }
381
382     /**
383      * Returns the index of the last segment in the log.
384      *
385      * @param index the compaction index
386      * @return the starting index of the last segment in the log
387      */
388     public long getCompactableIndex(final long index) {
389         final var segmentEntry = segments.floorEntry(index);
390         return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
391     }
392
393     /**
394      * Compacts the journal up to the given index.
395      *
396      * <p>
397      * The semantics of compaction are not specified by this interface.
398      *
399      * @param index The index up to which to compact the journal.
400      */
401     public void compact(final long index) {
402         final var firstIndex = getCompactableIndex(index);
403         if (firstIndex != 0) {
404             final var compactSegments = segments.headMap(firstIndex);
405             if (!compactSegments.isEmpty()) {
406                 LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
407                 compactSegments.values().forEach(JournalSegment::delete);
408                 compactSegments.clear();
409                 resetHead(firstIndex);
410             }
411         }
412     }
413
414     @Override
415     public void close() {
416         if (currentSegment != null) {
417             currentSegment = null;
418             segments.values().forEach(JournalSegment::close);
419             segments.clear();
420         }
421     }
422
423     /**
424      * Returns whether {@code flushOnCommit} is enabled for the log.
425      *
426      * @return Indicates whether {@code flushOnCommit} is enabled for the log.
427      */
428     boolean isFlushOnCommit() {
429         return flushOnCommit;
430     }
431
432     /**
433      * Updates commit index to the given value.
434      *
435      * @param index The index value.
436      */
437     void setCommitIndex(final long index) {
438         commitIndex = index;
439     }
440
441     /**
442      * Returns the journal last commit index.
443      *
444      * @return The journal last commit index.
445      */
446     long getCommitIndex() {
447         return commitIndex;
448     }
449
450     public static Builder builder() {
451         return new Builder();
452     }
453
454     /**
455      * Segmented byte journal builder.
456      */
457     public static final class Builder {
458         private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
459         private static final String DEFAULT_NAME = "atomix";
460         private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
461         private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
462         private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
463         private static final double DEFAULT_INDEX_DENSITY = .005;
464
465         private String name = DEFAULT_NAME;
466         private StorageLevel storageLevel = StorageLevel.DISK;
467         private File directory = new File(DEFAULT_DIRECTORY);
468         private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
469         private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
470         private double indexDensity = DEFAULT_INDEX_DENSITY;
471         private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
472
473         private Builder() {
474             // on purpose
475         }
476
477         /**
478          * Sets the journal name.
479          *
480          * @param name The journal name.
481          * @return The builder instance
482          */
483         public Builder withName(final String name) {
484             this.name = requireNonNull(name, "name cannot be null");
485             return this;
486         }
487
488         /**
489          * Sets the storage level.
490          *
491          * @param storageLevel The storage level.
492          * @return The builder instance
493          */
494         public Builder withStorageLevel(final StorageLevel storageLevel) {
495             this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
496             return this;
497         }
498
499         /**
500          * Sets the journal directory.
501          *
502          * @param directory The log directory.
503          * @return The builder instance
504          * @throws NullPointerException If the {@code directory} is {@code null}
505          */
506         public Builder withDirectory(final String directory) {
507             return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
508         }
509
510         /**
511          * Sets the journal directory.
512          *
513          * @param directory The log directory.
514          * @return The builder instance
515          * @throws NullPointerException If the {@code directory} is {@code null}
516          */
517         public Builder withDirectory(final File directory) {
518             this.directory = requireNonNull(directory, "directory cannot be null");
519             return this;
520         }
521
522         /**
523          * Sets the maximum segment size in bytes.
524          * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
525          *
526          * @param maxSegmentSize The maximum segment size in bytes.
527          * @return The builder instance
528          * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
529          */
530         public Builder withMaxSegmentSize(final int maxSegmentSize) {
531             checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
532                 "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
533             this.maxSegmentSize = maxSegmentSize;
534             return this;
535         }
536
537         /**
538          * Sets the maximum entry size in bytes.
539          *
540          * @param maxEntrySize the maximum entry size in bytes
541          * @return the builder instance
542          * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
543          */
544         public Builder withMaxEntrySize(final int maxEntrySize) {
545             checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
546             this.maxEntrySize = maxEntrySize;
547             return this;
548         }
549
550         /**
551          * Sets the journal index density.
552          *
553          * <p>
554          * The index density is the frequency at which the position of entries written to the journal will be
555          * recorded in an in-memory index for faster seeking.
556          *
557          * @param indexDensity the index density
558          * @return the builder instance
559          * @throws IllegalArgumentException if the density is not between 0 and 1
560          */
561         public Builder withIndexDensity(final double indexDensity) {
562             checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
563             this.indexDensity = indexDensity;
564             return this;
565         }
566
567         /**
568          * Enables flushing buffers to disk when entries are committed to a segment.
569          *
570          * <p>
571          * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
572          * an entry is committed in a given segment.
573          *
574          * @return The builder instance
575          */
576         public Builder withFlushOnCommit() {
577             return withFlushOnCommit(true);
578         }
579
580         /**
581          * Sets whether to flush buffers to disk when entries are committed to a segment.
582          *
583          * <p>
584          * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
585          * an entry is committed in a given segment.
586          *
587          * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
588          * @return The builder instance
589          */
590         public Builder withFlushOnCommit(final boolean flushOnCommit) {
591             this.flushOnCommit = flushOnCommit;
592             return this;
593         }
594
595         /**
596          * Build the {@link SegmentedByteBufJournal}.
597          *
598          * @return {@link SegmentedByteBufJournal} instance built.
599          */
600         public SegmentedByteBufJournal build() {
601             return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize,
602                 indexDensity, flushOnCommit);
603         }
604     }
605 }