9fef881edd64d513ce1b9123f180d6be7e222429
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.util.Collection;
26 import java.util.Map;
27 import java.util.TreeMap;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentNavigableMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * Segmented journal.
36  */
37 public final class SegmentedJournal<E> implements Journal<E> {
38   /**
39    * Returns a new Raft log builder.
40    *
41    * @return A new Raft log builder.
42    */
43   public static <E> Builder<E> builder() {
44     return new Builder<>();
45   }
46
47   private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
48   private static final int SEGMENT_BUFFER_FACTOR = 3;
49
50   private final String name;
51   private final StorageLevel storageLevel;
52   private final File directory;
53   private final JournalSerializer<E> serializer;
54   private final int maxSegmentSize;
55   private final int maxEntrySize;
56   private final int maxEntriesPerSegment;
57   private final double indexDensity;
58   private final boolean flushOnCommit;
59   private final SegmentedJournalWriter<E> writer;
60   private volatile long commitIndex;
61
62   private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
63   private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
64   private JournalSegment currentSegment;
65
66   private volatile boolean open = true;
67
68   public SegmentedJournal(
69       String name,
70       StorageLevel storageLevel,
71       File directory,
72       JournalSerdes namespace,
73       int maxSegmentSize,
74       int maxEntrySize,
75       int maxEntriesPerSegment,
76       double indexDensity,
77       boolean flushOnCommit) {
78     this.name = requireNonNull(name, "name cannot be null");
79     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
80     this.directory = requireNonNull(directory, "directory cannot be null");
81     this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
82     this.maxSegmentSize = maxSegmentSize;
83     this.maxEntrySize = maxEntrySize;
84     this.maxEntriesPerSegment = maxEntriesPerSegment;
85     this.indexDensity = indexDensity;
86     this.flushOnCommit = flushOnCommit;
87     open();
88     this.writer = new SegmentedJournalWriter<>(this);
89   }
90
91   /**
92    * Returns the segment file name prefix.
93    *
94    * @return The segment file name prefix.
95    */
96   public String name() {
97     return name;
98   }
99
100   /**
101    * Returns the storage directory.
102    * <p>
103    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
104    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
105    * when the log is opened.
106    *
107    * @return The storage directory.
108    */
109   public File directory() {
110     return directory;
111   }
112
113   /**
114    * Returns the storage level.
115    * <p>
116    * The storage level dictates how entries within individual journal segments should be stored.
117    *
118    * @return The storage level.
119    */
120   public StorageLevel storageLevel() {
121     return storageLevel;
122   }
123
124   /**
125    * Returns the maximum journal segment size.
126    * <p>
127    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
128    *
129    * @return The maximum segment size in bytes.
130    */
131   public int maxSegmentSize() {
132     return maxSegmentSize;
133   }
134
135   /**
136    * Returns the maximum journal entry size.
137    * <p>
138    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
139    *
140    * @return the maximum entry size in bytes
141    */
142   public int maxEntrySize() {
143     return maxEntrySize;
144   }
145
146   /**
147    * Returns the maximum number of entries per segment.
148    * <p>
149    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
150    * in a journal.
151    *
152    * @return The maximum number of entries per segment.
153    * @deprecated since 3.0.2
154    */
155   @Deprecated
156   public int maxEntriesPerSegment() {
157     return maxEntriesPerSegment;
158   }
159
160   /**
161    * Returns the collection of journal segments.
162    *
163    * @return the collection of journal segments
164    */
165   public Collection<JournalSegment> segments() {
166     return segments.values();
167   }
168
169   /**
170    * Returns the collection of journal segments with indexes greater than the given index.
171    *
172    * @param index the starting index
173    * @return the journal segments starting with indexes greater than or equal to the given index
174    */
175   public Collection<JournalSegment> segments(long index) {
176     return segments.tailMap(index).values();
177   }
178
179   /**
180    * Returns serializer instance.
181    *
182    * @return serializer instance
183    */
184   JournalSerializer<E> serializer() {
185     return serializer;
186   }
187
188   /**
189    * Returns the total size of the journal.
190    *
191    * @return the total size of the journal
192    */
193   public long size() {
194     return segments.values().stream()
195         .mapToLong(JournalSegment::size)
196         .sum();
197   }
198
199   @Override
200   public JournalWriter<E> writer() {
201     return writer;
202   }
203
204   @Override
205   public JournalReader<E> openReader(long index) {
206     return openReader(index, JournalReader.Mode.ALL);
207   }
208
209   /**
210    * Opens a new Raft log reader with the given reader mode.
211    *
212    * @param index The index from which to begin reading entries.
213    * @param mode The mode in which to read entries.
214    * @return The Raft log reader.
215    */
216   @Override
217   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
218     final var segment = getSegment(index);
219     final var reader = switch (mode) {
220       case ALL -> new SegmentedJournalReader<>(this, segment);
221       case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
222     };
223
224     // Forward reader to specified index
225     long next = reader.getNextIndex();
226     while (index > next && reader.tryAdvance()) {
227         next = reader.getNextIndex();
228     }
229
230     readers.add(reader);
231     return reader;
232   }
233
234   /**
235    * Opens the segments.
236    */
237   private synchronized void open() {
238     // Load existing log segments from disk.
239     for (var segment : loadSegments()) {
240       segments.put(segment.firstIndex(), segment);
241     }
242
243     // If a segment doesn't already exist, create an initial segment starting at index 1.
244     if (!segments.isEmpty()) {
245       currentSegment = segments.lastEntry().getValue();
246     } else {
247       currentSegment = createSegment(1, 1);
248       segments.put(1L, currentSegment);
249     }
250   }
251
252   /**
253    * Asserts that the manager is open.
254    *
255    * @throws IllegalStateException if the segment manager is not open
256    */
257   private void assertOpen() {
258     checkState(currentSegment != null, "journal not open");
259   }
260
261   /**
262    * Asserts that enough disk space is available to allocate a new segment.
263    */
264   private void assertDiskSpace() {
265     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
266       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
267     }
268   }
269
270   /**
271    * Resets the current segment, creating a new segment if necessary.
272    */
273   private synchronized void resetCurrentSegment() {
274     final var lastSegment = getLastSegment();
275     if (lastSegment == null) {
276       currentSegment = createSegment(1, 1);
277       segments.put(1L, currentSegment);
278     } else {
279       currentSegment = lastSegment;
280     }
281   }
282
283   /**
284    * Resets and returns the first segment in the journal.
285    *
286    * @param index the starting index of the journal
287    * @return the first segment
288    */
289   JournalSegment resetSegments(long index) {
290     assertOpen();
291
292     // If the index already equals the first segment index, skip the reset.
293     final var firstSegment = getFirstSegment();
294     if (index == firstSegment.firstIndex()) {
295       return firstSegment;
296     }
297
298     segments.values().forEach(JournalSegment::delete);
299     segments.clear();
300
301     currentSegment = createSegment(1, index);
302     segments.put(index, currentSegment);
303     return currentSegment;
304   }
305
306   /**
307    * Returns the first segment in the log.
308    *
309    * @throws IllegalStateException if the segment manager is not open
310    */
311   JournalSegment getFirstSegment() {
312     assertOpen();
313     Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
314     return segment != null ? segment.getValue() : null;
315   }
316
317   /**
318    * Returns the last segment in the log.
319    *
320    * @throws IllegalStateException if the segment manager is not open
321    */
322   JournalSegment getLastSegment() {
323     assertOpen();
324     Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
325     return segment != null ? segment.getValue() : null;
326   }
327
328   /**
329    * Creates and returns the next segment.
330    *
331    * @return The next segment.
332    * @throws IllegalStateException if the segment manager is not open
333    */
334   synchronized JournalSegment getNextSegment() {
335     assertOpen();
336     assertDiskSpace();
337
338     final var index = currentSegment.lastIndex() + 1;
339     final var lastSegment = getLastSegment();
340     currentSegment = createSegment(lastSegment != null ? lastSegment.file().descriptor().id() + 1 : 1, index);
341     segments.put(index, currentSegment);
342     return currentSegment;
343   }
344
345   /**
346    * Returns the segment following the segment with the given ID.
347    *
348    * @param index The segment index with which to look up the next segment.
349    * @return The next segment for the given index.
350    */
351   JournalSegment getNextSegment(long index) {
352     Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
353     return nextSegment != null ? nextSegment.getValue() : null;
354   }
355
356   /**
357    * Returns the segment for the given index.
358    *
359    * @param index The index for which to return the segment.
360    * @throws IllegalStateException if the segment manager is not open
361    */
362   synchronized JournalSegment getSegment(long index) {
363     assertOpen();
364     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
365     if (currentSegment != null && index > currentSegment.firstIndex()) {
366       return currentSegment;
367     }
368
369     // If the index is in another segment, get the entry with the next lowest first index.
370     Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
371     if (segment != null) {
372       return segment.getValue();
373     }
374     return getFirstSegment();
375   }
376
377   /**
378    * Removes a segment.
379    *
380    * @param segment The segment to remove.
381    */
382   synchronized void removeSegment(JournalSegment segment) {
383     segments.remove(segment.firstIndex());
384     segment.delete();
385     resetCurrentSegment();
386   }
387
388   /**
389    * Creates a new segment.
390    */
391   JournalSegment createSegment(long id, long index) {
392     final var segmentFile = JournalSegmentFile.createSegmentFile(name, directory, id);
393     final var descriptor = JournalSegmentDescriptor.builder()
394         .withId(id)
395         .withIndex(index)
396         .withMaxSegmentSize(maxSegmentSize)
397         .withMaxEntries(maxEntriesPerSegment)
398         .withUpdated(System.currentTimeMillis())
399         .build();
400
401     try (var raf = new RandomAccessFile(segmentFile, "rw")) {
402       raf.setLength(maxSegmentSize);
403       raf.write(descriptor.toArray());
404     } catch (IOException e) {
405       throw new StorageException(e);
406     }
407
408     final var segment = newSegment(new JournalSegmentFile(segmentFile.toPath(), descriptor));
409     LOG.debug("Created segment: {}", segment);
410     return segment;
411   }
412
413   /**
414    * Creates a new segment instance.
415    *
416    * @param segmentFile The segment file.
417    * @return The segment instance.
418    */
419   protected JournalSegment newSegment(JournalSegmentFile segmentFile) {
420     return new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
421   }
422
423   /**
424    * Loads all segments from disk.
425    *
426    * @return A collection of segments for the log.
427    */
428   protected Collection<JournalSegment> loadSegments() {
429     // Ensure log directories are created.
430     directory.mkdirs();
431
432     final var segments = new TreeMap<Long, JournalSegment>();
433
434     // Iterate through all files in the log directory.
435     for (var file : directory.listFiles(File::isFile)) {
436
437       // If the file looks like a segment file, attempt to load the segment.
438       if (JournalSegmentFile.isSegmentFile(name, file)) {
439         final var path = file.toPath();
440         // read the descriptor
441         final JournalSegmentDescriptor descriptor;
442         try {
443           descriptor = JournalSegmentDescriptor.readFrom(path);
444         } catch (IOException e) {
445           throw new StorageException(e);
446         }
447
448         // Load the segment.
449         final var segment = newSegment(new JournalSegmentFile(path, descriptor));
450         LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), path);
451
452         // Add the segment to the segments list.
453         segments.put(segment.firstIndex(), segment);
454       }
455     }
456
457     // Verify that all the segments in the log align with one another.
458     JournalSegment previousSegment = null;
459     boolean corrupted = false;
460     final var iterator = segments.entrySet().iterator();
461     while (iterator.hasNext()) {
462       final var segment = iterator.next().getValue();
463       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
464         LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
465             previousSegment.file().path());
466         corrupted = true;
467       }
468       if (corrupted) {
469         segment.delete();
470         iterator.remove();
471       }
472       previousSegment = segment;
473     }
474
475     return segments.values();
476   }
477
478   /**
479    * Resets journal readers to the given head.
480    *
481    * @param index The index at which to reset readers.
482    */
483   void resetHead(long index) {
484     for (var reader : readers) {
485       if (reader.getNextIndex() < index) {
486         reader.reset(index);
487       }
488     }
489   }
490
491   /**
492    * Resets journal readers to the given tail.
493    *
494    * @param index The index at which to reset readers.
495    */
496   void resetTail(long index) {
497     for (var reader : readers) {
498       if (reader.getNextIndex() >= index) {
499         reader.reset(index);
500       }
501     }
502   }
503
504   void closeReader(SegmentedJournalReader<E> reader) {
505     readers.remove(reader);
506   }
507
508   @Override
509   public boolean isOpen() {
510     return open;
511   }
512
513   /**
514    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
515    *
516    * @param index the index from which to remove segments
517    * @return indicates whether a segment can be removed from the journal
518    */
519   public boolean isCompactable(long index) {
520     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
521     return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
522   }
523
524   /**
525    * Returns the index of the last segment in the log.
526    *
527    * @param index the compaction index
528    * @return the starting index of the last segment in the log
529    */
530   public long getCompactableIndex(long index) {
531     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
532     return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
533   }
534
535   /**
536    * Compacts the journal up to the given index.
537    * <p>
538    * The semantics of compaction are not specified by this interface.
539    *
540    * @param index The index up to which to compact the journal.
541    */
542   public void compact(long index) {
543     final var segmentEntry = segments.floorEntry(index);
544     if (segmentEntry != null) {
545       final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
546       if (!compactSegments.isEmpty()) {
547         LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
548         compactSegments.values().forEach(JournalSegment::delete);
549         compactSegments.clear();
550         resetHead(segmentEntry.getValue().firstIndex());
551       }
552     }
553   }
554
555   @Override
556   public void close() {
557     segments.values().forEach(JournalSegment::close);
558     currentSegment = null;
559     open = false;
560   }
561
562   /**
563    * Returns whether {@code flushOnCommit} is enabled for the log.
564    *
565    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
566    */
567   boolean isFlushOnCommit() {
568     return flushOnCommit;
569   }
570
571   /**
572    * Commits entries up to the given index.
573    *
574    * @param index The index up to which to commit entries.
575    */
576   void setCommitIndex(long index) {
577     this.commitIndex = index;
578   }
579
580   /**
581    * Returns the Raft log commit index.
582    *
583    * @return The Raft log commit index.
584    */
585   long getCommitIndex() {
586     return commitIndex;
587   }
588
589   /**
590    * Raft log builder.
591    */
592   public static final class Builder<E> {
593     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
594     private static final String DEFAULT_NAME = "atomix";
595     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
596     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
597     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
598     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
599     private static final double DEFAULT_INDEX_DENSITY = .005;
600
601     private String name = DEFAULT_NAME;
602     private StorageLevel storageLevel = StorageLevel.DISK;
603     private File directory = new File(DEFAULT_DIRECTORY);
604     private JournalSerdes namespace;
605     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
606     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
607     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
608     private double indexDensity = DEFAULT_INDEX_DENSITY;
609     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
610
611     Builder() {
612       // Hidden on purpose
613     }
614
615     /**
616      * Sets the storage name.
617      *
618      * @param name The storage name.
619      * @return The storage builder.
620      */
621     public Builder<E> withName(String name) {
622       this.name = requireNonNull(name, "name cannot be null");
623       return this;
624     }
625
626     /**
627      * Sets the log storage level, returning the builder for method chaining.
628      * <p>
629      * The storage level indicates how individual entries should be persisted in the journal.
630      *
631      * @param storageLevel The log storage level.
632      * @return The storage builder.
633      */
634     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
635       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
636       return this;
637     }
638
639     /**
640      * Sets the log directory, returning the builder for method chaining.
641      * <p>
642      * The log will write segment files into the provided directory.
643      *
644      * @param directory The log directory.
645      * @return The storage builder.
646      * @throws NullPointerException If the {@code directory} is {@code null}
647      */
648     public Builder<E> withDirectory(String directory) {
649       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
650     }
651
652     /**
653      * Sets the log directory, returning the builder for method chaining.
654      * <p>
655      * The log will write segment files into the provided directory.
656      *
657      * @param directory The log directory.
658      * @return The storage builder.
659      * @throws NullPointerException If the {@code directory} is {@code null}
660      */
661     public Builder<E> withDirectory(File directory) {
662       this.directory = requireNonNull(directory, "directory cannot be null");
663       return this;
664     }
665
666     /**
667      * Sets the journal namespace, returning the builder for method chaining.
668      *
669      * @param namespace The journal serializer.
670      * @return The journal builder.
671      */
672     public Builder<E> withNamespace(JournalSerdes namespace) {
673       this.namespace = requireNonNull(namespace, "namespace cannot be null");
674       return this;
675     }
676
677     /**
678      * Sets the maximum segment size in bytes, returning the builder for method chaining.
679      * <p>
680      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
681      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
682      * segment and append new entries to that segment.
683      * <p>
684      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
685      *
686      * @param maxSegmentSize The maximum segment size in bytes.
687      * @return The storage builder.
688      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
689      */
690     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
691       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
692           "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
693       this.maxSegmentSize = maxSegmentSize;
694       return this;
695     }
696
697     /**
698      * Sets the maximum entry size in bytes, returning the builder for method chaining.
699      *
700      * @param maxEntrySize the maximum entry size in bytes
701      * @return the storage builder
702      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
703      */
704     public Builder<E> withMaxEntrySize(int maxEntrySize) {
705       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
706       this.maxEntrySize = maxEntrySize;
707       return this;
708     }
709
710     /**
711      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
712      * <p>
713      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
714      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
715      * new segment and append new entries to that segment.
716      * <p>
717      * By default, the maximum entries per segment is {@code 1024 * 1024}.
718      *
719      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
720      * @return The storage builder.
721      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
722      *     per segment
723      * @deprecated since 3.0.2
724      */
725     @Deprecated
726     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
727       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
728       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
729           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
730       this.maxEntriesPerSegment = maxEntriesPerSegment;
731       return this;
732     }
733
734     /**
735      * Sets the journal index density.
736      * <p>
737      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
738      * in-memory index for faster seeking.
739      *
740      * @param indexDensity the index density
741      * @return the journal builder
742      * @throws IllegalArgumentException if the density is not between 0 and 1
743      */
744     public Builder<E> withIndexDensity(double indexDensity) {
745       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
746       this.indexDensity = indexDensity;
747       return this;
748     }
749
750     /**
751      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
752      * chaining.
753      * <p>
754      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
755      * committed in a given segment.
756      *
757      * @return The storage builder.
758      */
759     public Builder<E> withFlushOnCommit() {
760       return withFlushOnCommit(true);
761     }
762
763     /**
764      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
765      * chaining.
766      * <p>
767      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
768      * committed in a given segment.
769      *
770      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
771      * @return The storage builder.
772      */
773     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
774       this.flushOnCommit = flushOnCommit;
775       return this;
776     }
777
778     /**
779      * Build the {@link SegmentedJournal}.
780      *
781      * @return A new {@link SegmentedJournal}.
782      */
783     public SegmentedJournal<E> build() {
784       return new SegmentedJournal<>(
785           name,
786           storageLevel,
787           directory,
788           namespace,
789           maxSegmentSize,
790           maxEntrySize,
791           maxEntriesPerSegment,
792           indexDensity,
793           flushOnCommit);
794     }
795   }
796 }