Expand JournalSegmentFile semantics
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.util.Collection;
26 import java.util.Map;
27 import java.util.TreeMap;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentNavigableMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * Segmented journal.
36  */
37 public final class SegmentedJournal<E> implements Journal<E> {
38   /**
39    * Returns a new Raft log builder.
40    *
41    * @return A new Raft log builder.
42    */
43   public static <E> Builder<E> builder() {
44     return new Builder<>();
45   }
46
47   private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
48   private static final int SEGMENT_BUFFER_FACTOR = 3;
49
50   private final String name;
51   private final StorageLevel storageLevel;
52   private final File directory;
53   private final JournalSerializer<E> serializer;
54   private final int maxSegmentSize;
55   private final int maxEntrySize;
56   private final int maxEntriesPerSegment;
57   private final double indexDensity;
58   private final boolean flushOnCommit;
59   private final SegmentedJournalWriter<E> writer;
60   private volatile long commitIndex;
61
62   private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
63   private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
64   private JournalSegment currentSegment;
65
66   private volatile boolean open = true;
67
68   public SegmentedJournal(
69       String name,
70       StorageLevel storageLevel,
71       File directory,
72       JournalSerdes namespace,
73       int maxSegmentSize,
74       int maxEntrySize,
75       int maxEntriesPerSegment,
76       double indexDensity,
77       boolean flushOnCommit) {
78     this.name = requireNonNull(name, "name cannot be null");
79     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
80     this.directory = requireNonNull(directory, "directory cannot be null");
81     this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
82     this.maxSegmentSize = maxSegmentSize;
83     this.maxEntrySize = maxEntrySize;
84     this.maxEntriesPerSegment = maxEntriesPerSegment;
85     this.indexDensity = indexDensity;
86     this.flushOnCommit = flushOnCommit;
87     open();
88     this.writer = new SegmentedJournalWriter<>(this);
89   }
90
91   /**
92    * Returns the segment file name prefix.
93    *
94    * @return The segment file name prefix.
95    */
96   public String name() {
97     return name;
98   }
99
100   /**
101    * Returns the storage directory.
102    * <p>
103    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
104    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
105    * when the log is opened.
106    *
107    * @return The storage directory.
108    */
109   public File directory() {
110     return directory;
111   }
112
113   /**
114    * Returns the storage level.
115    * <p>
116    * The storage level dictates how entries within individual journal segments should be stored.
117    *
118    * @return The storage level.
119    */
120   public StorageLevel storageLevel() {
121     return storageLevel;
122   }
123
124   /**
125    * Returns the maximum journal segment size.
126    * <p>
127    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
128    *
129    * @return The maximum segment size in bytes.
130    */
131   public int maxSegmentSize() {
132     return maxSegmentSize;
133   }
134
135   /**
136    * Returns the maximum journal entry size.
137    * <p>
138    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
139    *
140    * @return the maximum entry size in bytes
141    */
142   public int maxEntrySize() {
143     return maxEntrySize;
144   }
145
146   /**
147    * Returns the maximum number of entries per segment.
148    * <p>
149    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
150    * in a journal.
151    *
152    * @return The maximum number of entries per segment.
153    * @deprecated since 3.0.2
154    */
155   @Deprecated
156   public int maxEntriesPerSegment() {
157     return maxEntriesPerSegment;
158   }
159
160   /**
161    * Returns the collection of journal segments.
162    *
163    * @return the collection of journal segments
164    */
165   public Collection<JournalSegment> segments() {
166     return segments.values();
167   }
168
169   /**
170    * Returns the collection of journal segments with indexes greater than the given index.
171    *
172    * @param index the starting index
173    * @return the journal segments starting with indexes greater than or equal to the given index
174    */
175   public Collection<JournalSegment> segments(long index) {
176     return segments.tailMap(index).values();
177   }
178
179   /**
180    * Returns serializer instance.
181    *
182    * @return serializer instance
183    */
184   JournalSerializer<E> serializer() {
185     return serializer;
186   }
187
188   /**
189    * Returns the total size of the journal.
190    *
191    * @return the total size of the journal
192    */
193   public long size() {
194     return segments.values().stream()
195         .mapToLong(JournalSegment::size)
196         .sum();
197   }
198
199   @Override
200   public JournalWriter<E> writer() {
201     return writer;
202   }
203
204   @Override
205   public JournalReader<E> openReader(long index) {
206     return openReader(index, JournalReader.Mode.ALL);
207   }
208
209   /**
210    * Opens a new Raft log reader with the given reader mode.
211    *
212    * @param index The index from which to begin reading entries.
213    * @param mode The mode in which to read entries.
214    * @return The Raft log reader.
215    */
216   @Override
217   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
218     final var segment = getSegment(index);
219     final var reader = switch (mode) {
220       case ALL -> new SegmentedJournalReader<>(this, segment);
221       case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
222     };
223
224     // Forward reader to specified index
225     long next = reader.getNextIndex();
226     while (index > next && reader.tryAdvance()) {
227         next = reader.getNextIndex();
228     }
229
230     readers.add(reader);
231     return reader;
232   }
233
234   /**
235    * Opens the segments.
236    */
237   private synchronized void open() {
238     // Load existing log segments from disk.
239     for (var segment : loadSegments()) {
240       segments.put(segment.firstIndex(), segment);
241     }
242
243     // If a segment doesn't already exist, create an initial segment starting at index 1.
244     if (!segments.isEmpty()) {
245       currentSegment = segments.lastEntry().getValue();
246     } else {
247       currentSegment = createSegment(1, 1);
248       segments.put(1L, currentSegment);
249     }
250   }
251
252   /**
253    * Asserts that the manager is open.
254    *
255    * @throws IllegalStateException if the segment manager is not open
256    */
257   private void assertOpen() {
258     checkState(currentSegment != null, "journal not open");
259   }
260
261   /**
262    * Asserts that enough disk space is available to allocate a new segment.
263    */
264   private void assertDiskSpace() {
265     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
266       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
267     }
268   }
269
270   /**
271    * Resets the current segment, creating a new segment if necessary.
272    */
273   private synchronized void resetCurrentSegment() {
274     final var lastSegment = getLastSegment();
275     if (lastSegment == null) {
276       currentSegment = createSegment(1, 1);
277       segments.put(1L, currentSegment);
278     } else {
279       currentSegment = lastSegment;
280     }
281   }
282
283   /**
284    * Resets and returns the first segment in the journal.
285    *
286    * @param index the starting index of the journal
287    * @return the first segment
288    */
289   JournalSegment resetSegments(long index) {
290     assertOpen();
291
292     // If the index already equals the first segment index, skip the reset.
293     JournalSegment firstSegment = getFirstSegment();
294     if (index == firstSegment.firstIndex()) {
295       return firstSegment;
296     }
297
298     for (JournalSegment segment : segments.values()) {
299       segment.close();
300       segment.delete();
301     }
302     segments.clear();
303
304     currentSegment = createSegment(1, index);
305     segments.put(index, currentSegment);
306     return currentSegment;
307   }
308
309   /**
310    * Returns the first segment in the log.
311    *
312    * @throws IllegalStateException if the segment manager is not open
313    */
314   JournalSegment getFirstSegment() {
315     assertOpen();
316     Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
317     return segment != null ? segment.getValue() : null;
318   }
319
320   /**
321    * Returns the last segment in the log.
322    *
323    * @throws IllegalStateException if the segment manager is not open
324    */
325   JournalSegment getLastSegment() {
326     assertOpen();
327     Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
328     return segment != null ? segment.getValue() : null;
329   }
330
331   /**
332    * Creates and returns the next segment.
333    *
334    * @return The next segment.
335    * @throws IllegalStateException if the segment manager is not open
336    */
337   synchronized JournalSegment getNextSegment() {
338     assertOpen();
339     assertDiskSpace();
340
341     final var index = currentSegment.lastIndex() + 1;
342     final var lastSegment = getLastSegment();
343     currentSegment = createSegment(lastSegment != null ? lastSegment.file().descriptor().id() + 1 : 1, index);
344     segments.put(index, currentSegment);
345     return currentSegment;
346   }
347
348   /**
349    * Returns the segment following the segment with the given ID.
350    *
351    * @param index The segment index with which to look up the next segment.
352    * @return The next segment for the given index.
353    */
354   JournalSegment getNextSegment(long index) {
355     Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
356     return nextSegment != null ? nextSegment.getValue() : null;
357   }
358
359   /**
360    * Returns the segment for the given index.
361    *
362    * @param index The index for which to return the segment.
363    * @throws IllegalStateException if the segment manager is not open
364    */
365   synchronized JournalSegment getSegment(long index) {
366     assertOpen();
367     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
368     if (currentSegment != null && index > currentSegment.firstIndex()) {
369       return currentSegment;
370     }
371
372     // If the index is in another segment, get the entry with the next lowest first index.
373     Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
374     if (segment != null) {
375       return segment.getValue();
376     }
377     return getFirstSegment();
378   }
379
380   /**
381    * Removes a segment.
382    *
383    * @param segment The segment to remove.
384    */
385   synchronized void removeSegment(JournalSegment segment) {
386     segments.remove(segment.firstIndex());
387     segment.close();
388     segment.delete();
389     resetCurrentSegment();
390   }
391
392   /**
393    * Creates a new segment.
394    */
395   JournalSegment createSegment(long id, long index) {
396     final var segmentFile = JournalSegmentFile.createSegmentFile(name, directory, id);
397     final var descriptor = JournalSegmentDescriptor.builder()
398         .withId(id)
399         .withIndex(index)
400         .withMaxSegmentSize(maxSegmentSize)
401         .withMaxEntries(maxEntriesPerSegment)
402         .withUpdated(System.currentTimeMillis())
403         .build();
404
405     try (var raf = new RandomAccessFile(segmentFile, "rw")) {
406       raf.setLength(maxSegmentSize);
407       raf.write(descriptor.toArray());
408     } catch (IOException e) {
409       throw new StorageException(e);
410     }
411
412     final var segment = newSegment(new JournalSegmentFile(segmentFile.toPath(), descriptor));
413     LOG.debug("Created segment: {}", segment);
414     return segment;
415   }
416
417   /**
418    * Creates a new segment instance.
419    *
420    * @param segmentFile The segment file.
421    * @return The segment instance.
422    */
423   protected JournalSegment newSegment(JournalSegmentFile segmentFile) {
424     return new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
425   }
426
427   /**
428    * Loads all segments from disk.
429    *
430    * @return A collection of segments for the log.
431    */
432   protected Collection<JournalSegment> loadSegments() {
433     // Ensure log directories are created.
434     directory.mkdirs();
435
436     final var segments = new TreeMap<Long, JournalSegment>();
437
438     // Iterate through all files in the log directory.
439     for (var file : directory.listFiles(File::isFile)) {
440
441       // If the file looks like a segment file, attempt to load the segment.
442       if (JournalSegmentFile.isSegmentFile(name, file)) {
443         final var path = file.toPath();
444         // read the descriptor
445         final JournalSegmentDescriptor descriptor;
446         try {
447           descriptor = JournalSegmentDescriptor.readFrom(path);
448         } catch (IOException e) {
449           throw new StorageException(e);
450         }
451
452         // Load the segment.
453         final var segment = newSegment(new JournalSegmentFile(path, descriptor));
454         LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), path);
455
456         // Add the segment to the segments list.
457         segments.put(segment.firstIndex(), segment);
458       }
459     }
460
461     // Verify that all the segments in the log align with one another.
462     JournalSegment previousSegment = null;
463     boolean corrupted = false;
464     final var iterator = segments.entrySet().iterator();
465     while (iterator.hasNext()) {
466       final var segment = iterator.next().getValue();
467       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
468         LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
469             previousSegment.file().path());
470         corrupted = true;
471       }
472       if (corrupted) {
473         segment.close();
474         segment.delete();
475         iterator.remove();
476       }
477       previousSegment = segment;
478     }
479
480     return segments.values();
481   }
482
483   /**
484    * Resets journal readers to the given head.
485    *
486    * @param index The index at which to reset readers.
487    */
488   void resetHead(long index) {
489     for (var reader : readers) {
490       if (reader.getNextIndex() < index) {
491         reader.reset(index);
492       }
493     }
494   }
495
496   /**
497    * Resets journal readers to the given tail.
498    *
499    * @param index The index at which to reset readers.
500    */
501   void resetTail(long index) {
502     for (var reader : readers) {
503       if (reader.getNextIndex() >= index) {
504         reader.reset(index);
505       }
506     }
507   }
508
509   void closeReader(SegmentedJournalReader<E> reader) {
510     readers.remove(reader);
511   }
512
513   @Override
514   public boolean isOpen() {
515     return open;
516   }
517
518   /**
519    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
520    *
521    * @param index the index from which to remove segments
522    * @return indicates whether a segment can be removed from the journal
523    */
524   public boolean isCompactable(long index) {
525     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
526     return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
527   }
528
529   /**
530    * Returns the index of the last segment in the log.
531    *
532    * @param index the compaction index
533    * @return the starting index of the last segment in the log
534    */
535   public long getCompactableIndex(long index) {
536     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
537     return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
538   }
539
540   /**
541    * Compacts the journal up to the given index.
542    * <p>
543    * The semantics of compaction are not specified by this interface.
544    *
545    * @param index The index up to which to compact the journal.
546    */
547   public void compact(long index) {
548     final var segmentEntry = segments.floorEntry(index);
549     if (segmentEntry != null) {
550       final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
551       if (!compactSegments.isEmpty()) {
552         LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
553         for (JournalSegment segment : compactSegments.values()) {
554           LOG.trace("Deleting segment: {}", segment);
555           segment.close();
556           segment.delete();
557         }
558         compactSegments.clear();
559         resetHead(segmentEntry.getValue().firstIndex());
560       }
561     }
562   }
563
564   @Override
565   public void close() {
566     segments.values().forEach(segment -> {
567       LOG.debug("Closing segment: {}", segment);
568       segment.close();
569     });
570     currentSegment = null;
571     open = false;
572   }
573
574   /**
575    * Returns whether {@code flushOnCommit} is enabled for the log.
576    *
577    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
578    */
579   boolean isFlushOnCommit() {
580     return flushOnCommit;
581   }
582
583   /**
584    * Commits entries up to the given index.
585    *
586    * @param index The index up to which to commit entries.
587    */
588   void setCommitIndex(long index) {
589     this.commitIndex = index;
590   }
591
592   /**
593    * Returns the Raft log commit index.
594    *
595    * @return The Raft log commit index.
596    */
597   long getCommitIndex() {
598     return commitIndex;
599   }
600
601   /**
602    * Raft log builder.
603    */
604   public static final class Builder<E> {
605     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
606     private static final String DEFAULT_NAME = "atomix";
607     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
608     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
609     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
610     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
611     private static final double DEFAULT_INDEX_DENSITY = .005;
612
613     private String name = DEFAULT_NAME;
614     private StorageLevel storageLevel = StorageLevel.DISK;
615     private File directory = new File(DEFAULT_DIRECTORY);
616     private JournalSerdes namespace;
617     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
618     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
619     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
620     private double indexDensity = DEFAULT_INDEX_DENSITY;
621     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
622
623     Builder() {
624       // Hidden on purpose
625     }
626
627     /**
628      * Sets the storage name.
629      *
630      * @param name The storage name.
631      * @return The storage builder.
632      */
633     public Builder<E> withName(String name) {
634       this.name = requireNonNull(name, "name cannot be null");
635       return this;
636     }
637
638     /**
639      * Sets the log storage level, returning the builder for method chaining.
640      * <p>
641      * The storage level indicates how individual entries should be persisted in the journal.
642      *
643      * @param storageLevel The log storage level.
644      * @return The storage builder.
645      */
646     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
647       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
648       return this;
649     }
650
651     /**
652      * Sets the log directory, returning the builder for method chaining.
653      * <p>
654      * The log will write segment files into the provided directory.
655      *
656      * @param directory The log directory.
657      * @return The storage builder.
658      * @throws NullPointerException If the {@code directory} is {@code null}
659      */
660     public Builder<E> withDirectory(String directory) {
661       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
662     }
663
664     /**
665      * Sets the log directory, returning the builder for method chaining.
666      * <p>
667      * The log will write segment files into the provided directory.
668      *
669      * @param directory The log directory.
670      * @return The storage builder.
671      * @throws NullPointerException If the {@code directory} is {@code null}
672      */
673     public Builder<E> withDirectory(File directory) {
674       this.directory = requireNonNull(directory, "directory cannot be null");
675       return this;
676     }
677
678     /**
679      * Sets the journal namespace, returning the builder for method chaining.
680      *
681      * @param namespace The journal serializer.
682      * @return The journal builder.
683      */
684     public Builder<E> withNamespace(JournalSerdes namespace) {
685       this.namespace = requireNonNull(namespace, "namespace cannot be null");
686       return this;
687     }
688
689     /**
690      * Sets the maximum segment size in bytes, returning the builder for method chaining.
691      * <p>
692      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
693      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
694      * segment and append new entries to that segment.
695      * <p>
696      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
697      *
698      * @param maxSegmentSize The maximum segment size in bytes.
699      * @return The storage builder.
700      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
701      */
702     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
703       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
704           "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
705       this.maxSegmentSize = maxSegmentSize;
706       return this;
707     }
708
709     /**
710      * Sets the maximum entry size in bytes, returning the builder for method chaining.
711      *
712      * @param maxEntrySize the maximum entry size in bytes
713      * @return the storage builder
714      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
715      */
716     public Builder<E> withMaxEntrySize(int maxEntrySize) {
717       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
718       this.maxEntrySize = maxEntrySize;
719       return this;
720     }
721
722     /**
723      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
724      * <p>
725      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
726      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
727      * new segment and append new entries to that segment.
728      * <p>
729      * By default, the maximum entries per segment is {@code 1024 * 1024}.
730      *
731      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
732      * @return The storage builder.
733      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
734      *     per segment
735      * @deprecated since 3.0.2
736      */
737     @Deprecated
738     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
739       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
740       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
741           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
742       this.maxEntriesPerSegment = maxEntriesPerSegment;
743       return this;
744     }
745
746     /**
747      * Sets the journal index density.
748      * <p>
749      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
750      * in-memory index for faster seeking.
751      *
752      * @param indexDensity the index density
753      * @return the journal builder
754      * @throws IllegalArgumentException if the density is not between 0 and 1
755      */
756     public Builder<E> withIndexDensity(double indexDensity) {
757       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
758       this.indexDensity = indexDensity;
759       return this;
760     }
761
762     /**
763      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
764      * chaining.
765      * <p>
766      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
767      * committed in a given segment.
768      *
769      * @return The storage builder.
770      */
771     public Builder<E> withFlushOnCommit() {
772       return withFlushOnCommit(true);
773     }
774
775     /**
776      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
777      * chaining.
778      * <p>
779      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
780      * committed in a given segment.
781      *
782      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
783      * @return The storage builder.
784      */
785     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
786       this.flushOnCommit = flushOnCommit;
787       return this;
788     }
789
790     /**
791      * Build the {@link SegmentedJournal}.
792      *
793      * @return A new {@link SegmentedJournal}.
794      */
795     public SegmentedJournal<E> build() {
796       return new SegmentedJournal<>(
797           name,
798           storageLevel,
799           directory,
800           namespace,
801           maxSegmentSize,
802           maxEntrySize,
803           maxEntriesPerSegment,
804           indexDensity,
805           flushOnCommit);
806     }
807   }
808 }