Refactor JournalSegmentDescriptor
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.util.Collection;
26 import java.util.Map;
27 import java.util.TreeMap;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentNavigableMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * Segmented journal.
36  */
37 public final class SegmentedJournal<E> implements Journal<E> {
38   /**
39    * Returns a new Raft log builder.
40    *
41    * @return A new Raft log builder.
42    */
43   public static <E> Builder<E> builder() {
44     return new Builder<>();
45   }
46
47   private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
48   private static final int SEGMENT_BUFFER_FACTOR = 3;
49
50   private final String name;
51   private final StorageLevel storageLevel;
52   private final File directory;
53   private final JournalSerializer<E> serializer;
54   private final int maxSegmentSize;
55   private final int maxEntrySize;
56   private final int maxEntriesPerSegment;
57   private final double indexDensity;
58   private final boolean flushOnCommit;
59   private final SegmentedJournalWriter<E> writer;
60   private volatile long commitIndex;
61
62   private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
63   private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
64   private JournalSegment currentSegment;
65
66   private volatile boolean open = true;
67
68   public SegmentedJournal(
69       String name,
70       StorageLevel storageLevel,
71       File directory,
72       JournalSerdes namespace,
73       int maxSegmentSize,
74       int maxEntrySize,
75       int maxEntriesPerSegment,
76       double indexDensity,
77       boolean flushOnCommit) {
78     this.name = requireNonNull(name, "name cannot be null");
79     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
80     this.directory = requireNonNull(directory, "directory cannot be null");
81     this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
82     this.maxSegmentSize = maxSegmentSize;
83     this.maxEntrySize = maxEntrySize;
84     this.maxEntriesPerSegment = maxEntriesPerSegment;
85     this.indexDensity = indexDensity;
86     this.flushOnCommit = flushOnCommit;
87     open();
88     this.writer = new SegmentedJournalWriter<>(this);
89   }
90
91   /**
92    * Returns the segment file name prefix.
93    *
94    * @return The segment file name prefix.
95    */
96   public String name() {
97     return name;
98   }
99
100   /**
101    * Returns the storage directory.
102    * <p>
103    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
104    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
105    * when the log is opened.
106    *
107    * @return The storage directory.
108    */
109   public File directory() {
110     return directory;
111   }
112
113   /**
114    * Returns the storage level.
115    * <p>
116    * The storage level dictates how entries within individual journal segments should be stored.
117    *
118    * @return The storage level.
119    */
120   public StorageLevel storageLevel() {
121     return storageLevel;
122   }
123
124   /**
125    * Returns the maximum journal segment size.
126    * <p>
127    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
128    *
129    * @return The maximum segment size in bytes.
130    */
131   public int maxSegmentSize() {
132     return maxSegmentSize;
133   }
134
135   /**
136    * Returns the maximum journal entry size.
137    * <p>
138    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
139    *
140    * @return the maximum entry size in bytes
141    */
142   public int maxEntrySize() {
143     return maxEntrySize;
144   }
145
146   /**
147    * Returns the maximum number of entries per segment.
148    * <p>
149    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
150    * in a journal.
151    *
152    * @return The maximum number of entries per segment.
153    * @deprecated since 3.0.2
154    */
155   @Deprecated
156   public int maxEntriesPerSegment() {
157     return maxEntriesPerSegment;
158   }
159
160   /**
161    * Returns the collection of journal segments.
162    *
163    * @return the collection of journal segments
164    */
165   public Collection<JournalSegment> segments() {
166     return segments.values();
167   }
168
169   /**
170    * Returns the collection of journal segments with indexes greater than the given index.
171    *
172    * @param index the starting index
173    * @return the journal segments starting with indexes greater than or equal to the given index
174    */
175   public Collection<JournalSegment> segments(long index) {
176     return segments.tailMap(index).values();
177   }
178
179   /**
180    * Returns serializer instance.
181    *
182    * @return serializer instance
183    */
184   JournalSerializer<E> serializer() {
185     return serializer;
186   }
187
188   /**
189    * Returns the total size of the journal.
190    *
191    * @return the total size of the journal
192    */
193   public long size() {
194     return segments.values().stream()
195         .mapToLong(JournalSegment::size)
196         .sum();
197   }
198
199   @Override
200   public JournalWriter<E> writer() {
201     return writer;
202   }
203
204   @Override
205   public JournalReader<E> openReader(long index) {
206     return openReader(index, JournalReader.Mode.ALL);
207   }
208
209   /**
210    * Opens a new Raft log reader with the given reader mode.
211    *
212    * @param index The index from which to begin reading entries.
213    * @param mode The mode in which to read entries.
214    * @return The Raft log reader.
215    */
216   @Override
217   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
218     final var segment = getSegment(index);
219     final var reader = switch (mode) {
220       case ALL -> new SegmentedJournalReader<>(this, segment);
221       case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
222     };
223
224     // Forward reader to specified index
225     long next = reader.getNextIndex();
226     while (index > next && reader.tryAdvance()) {
227         next = reader.getNextIndex();
228     }
229
230     readers.add(reader);
231     return reader;
232   }
233
234   /**
235    * Opens the segments.
236    */
237   private synchronized void open() {
238     // Load existing log segments from disk.
239     for (JournalSegment segment : loadSegments()) {
240       segments.put(segment.descriptor().index(), segment);
241     }
242
243     // If a segment doesn't already exist, create an initial segment starting at index 1.
244     if (!segments.isEmpty()) {
245       currentSegment = segments.lastEntry().getValue();
246     } else {
247       currentSegment = createSegment(1, 1);
248       segments.put(1L, currentSegment);
249     }
250   }
251
252   /**
253    * Asserts that the manager is open.
254    *
255    * @throws IllegalStateException if the segment manager is not open
256    */
257   private void assertOpen() {
258     checkState(currentSegment != null, "journal not open");
259   }
260
261   /**
262    * Asserts that enough disk space is available to allocate a new segment.
263    */
264   private void assertDiskSpace() {
265     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
266       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
267     }
268   }
269
270   /**
271    * Resets the current segment, creating a new segment if necessary.
272    */
273   private synchronized void resetCurrentSegment() {
274     final var lastSegment = getLastSegment();
275     if (lastSegment == null) {
276       currentSegment = createSegment(1, 1);
277       segments.put(1L, currentSegment);
278     } else {
279       currentSegment = lastSegment;
280     }
281   }
282
283   /**
284    * Resets and returns the first segment in the journal.
285    *
286    * @param index the starting index of the journal
287    * @return the first segment
288    */
289   JournalSegment resetSegments(long index) {
290     assertOpen();
291
292     // If the index already equals the first segment index, skip the reset.
293     JournalSegment firstSegment = getFirstSegment();
294     if (index == firstSegment.firstIndex()) {
295       return firstSegment;
296     }
297
298     for (JournalSegment segment : segments.values()) {
299       segment.close();
300       segment.delete();
301     }
302     segments.clear();
303
304     currentSegment = createSegment(1, index);
305     segments.put(index, currentSegment);
306     return currentSegment;
307   }
308
309   /**
310    * Returns the first segment in the log.
311    *
312    * @throws IllegalStateException if the segment manager is not open
313    */
314   JournalSegment getFirstSegment() {
315     assertOpen();
316     Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
317     return segment != null ? segment.getValue() : null;
318   }
319
320   /**
321    * Returns the last segment in the log.
322    *
323    * @throws IllegalStateException if the segment manager is not open
324    */
325   JournalSegment getLastSegment() {
326     assertOpen();
327     Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
328     return segment != null ? segment.getValue() : null;
329   }
330
331   /**
332    * Creates and returns the next segment.
333    *
334    * @return The next segment.
335    * @throws IllegalStateException if the segment manager is not open
336    */
337   synchronized JournalSegment getNextSegment() {
338     assertOpen();
339     assertDiskSpace();
340
341     final var index = currentSegment.lastIndex() + 1;
342     final var lastSegment = getLastSegment();
343     currentSegment = createSegment(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1, index);
344     segments.put(index, currentSegment);
345     return currentSegment;
346   }
347
348   /**
349    * Returns the segment following the segment with the given ID.
350    *
351    * @param index The segment index with which to look up the next segment.
352    * @return The next segment for the given index.
353    */
354   JournalSegment getNextSegment(long index) {
355     Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
356     return nextSegment != null ? nextSegment.getValue() : null;
357   }
358
359   /**
360    * Returns the segment for the given index.
361    *
362    * @param index The index for which to return the segment.
363    * @throws IllegalStateException if the segment manager is not open
364    */
365   synchronized JournalSegment getSegment(long index) {
366     assertOpen();
367     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
368     if (currentSegment != null && index > currentSegment.firstIndex()) {
369       return currentSegment;
370     }
371
372     // If the index is in another segment, get the entry with the next lowest first index.
373     Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
374     if (segment != null) {
375       return segment.getValue();
376     }
377     return getFirstSegment();
378   }
379
380   /**
381    * Removes a segment.
382    *
383    * @param segment The segment to remove.
384    */
385   synchronized void removeSegment(JournalSegment segment) {
386     segments.remove(segment.firstIndex());
387     segment.close();
388     segment.delete();
389     resetCurrentSegment();
390   }
391
392   /**
393    * Creates a new segment.
394    */
395   JournalSegment createSegment(long id, long index) {
396     final var segmentFile = JournalSegmentFile.createSegmentFile(name, directory, id);
397     final var descriptor = JournalSegmentDescriptor.builder()
398         .withId(id)
399         .withIndex(index)
400         .withMaxSegmentSize(maxSegmentSize)
401         .withMaxEntries(maxEntriesPerSegment)
402         .withUpdated(System.currentTimeMillis())
403         .build();
404
405     try (var raf = new RandomAccessFile(segmentFile, "rw")) {
406       raf.setLength(maxSegmentSize);
407       raf.write(descriptor.toArray());
408     } catch (IOException e) {
409       throw new StorageException(e);
410     }
411
412     final var segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
413     LOG.debug("Created segment: {}", segment);
414     return segment;
415   }
416
417   /**
418    * Creates a new segment instance.
419    *
420    * @param segmentFile The segment file.
421    * @param descriptor The segment descriptor.
422    * @return The segment instance.
423    */
424   protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
425     return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
426   }
427
428   /**
429    * Loads all segments from disk.
430    *
431    * @return A collection of segments for the log.
432    */
433   protected Collection<JournalSegment> loadSegments() {
434     // Ensure log directories are created.
435     directory.mkdirs();
436
437     final var segments = new TreeMap<Long, JournalSegment>();
438
439     // Iterate through all files in the log directory.
440     for (var file : directory.listFiles(File::isFile)) {
441
442       // If the file looks like a segment file, attempt to load the segment.
443       if (JournalSegmentFile.isSegmentFile(name, file)) {
444         // read the descriptor
445         final JournalSegmentDescriptor descriptor;
446         try {
447           descriptor = JournalSegmentDescriptor.readFrom(file.toPath());
448         } catch (IOException e) {
449           throw new StorageException(e);
450         }
451
452         // Load the segment.
453         final var segmentFile = new JournalSegmentFile(file);
454         final var segment = newSegment(segmentFile, descriptor);
455         LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName());
456
457         // Add the segment to the segments list.
458         segments.put(segment.firstIndex(), segment);
459       }
460     }
461
462     // Verify that all the segments in the log align with one another.
463     JournalSegment previousSegment = null;
464     boolean corrupted = false;
465     final var iterator = segments.entrySet().iterator();
466     while (iterator.hasNext()) {
467       final var segment = iterator.next().getValue();
468       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
469         LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(),
470             previousSegment.file().file());
471         corrupted = true;
472       }
473       if (corrupted) {
474         segment.close();
475         segment.delete();
476         iterator.remove();
477       }
478       previousSegment = segment;
479     }
480
481     return segments.values();
482   }
483
484   /**
485    * Resets journal readers to the given head.
486    *
487    * @param index The index at which to reset readers.
488    */
489   void resetHead(long index) {
490     for (var reader : readers) {
491       if (reader.getNextIndex() < index) {
492         reader.reset(index);
493       }
494     }
495   }
496
497   /**
498    * Resets journal readers to the given tail.
499    *
500    * @param index The index at which to reset readers.
501    */
502   void resetTail(long index) {
503     for (var reader : readers) {
504       if (reader.getNextIndex() >= index) {
505         reader.reset(index);
506       }
507     }
508   }
509
510   void closeReader(SegmentedJournalReader<E> reader) {
511     readers.remove(reader);
512   }
513
514   @Override
515   public boolean isOpen() {
516     return open;
517   }
518
519   /**
520    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
521    *
522    * @param index the index from which to remove segments
523    * @return indicates whether a segment can be removed from the journal
524    */
525   public boolean isCompactable(long index) {
526     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
527     return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
528   }
529
530   /**
531    * Returns the index of the last segment in the log.
532    *
533    * @param index the compaction index
534    * @return the starting index of the last segment in the log
535    */
536   public long getCompactableIndex(long index) {
537     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
538     return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
539   }
540
541   /**
542    * Compacts the journal up to the given index.
543    * <p>
544    * The semantics of compaction are not specified by this interface.
545    *
546    * @param index The index up to which to compact the journal.
547    */
548   public void compact(long index) {
549     final var segmentEntry = segments.floorEntry(index);
550     if (segmentEntry != null) {
551       final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
552       if (!compactSegments.isEmpty()) {
553         LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
554         for (JournalSegment segment : compactSegments.values()) {
555           LOG.trace("Deleting segment: {}", segment);
556           segment.close();
557           segment.delete();
558         }
559         compactSegments.clear();
560         resetHead(segmentEntry.getValue().firstIndex());
561       }
562     }
563   }
564
565   @Override
566   public void close() {
567     segments.values().forEach(segment -> {
568       LOG.debug("Closing segment: {}", segment);
569       segment.close();
570     });
571     currentSegment = null;
572     open = false;
573   }
574
575   /**
576    * Returns whether {@code flushOnCommit} is enabled for the log.
577    *
578    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
579    */
580   boolean isFlushOnCommit() {
581     return flushOnCommit;
582   }
583
584   /**
585    * Commits entries up to the given index.
586    *
587    * @param index The index up to which to commit entries.
588    */
589   void setCommitIndex(long index) {
590     this.commitIndex = index;
591   }
592
593   /**
594    * Returns the Raft log commit index.
595    *
596    * @return The Raft log commit index.
597    */
598   long getCommitIndex() {
599     return commitIndex;
600   }
601
602   /**
603    * Raft log builder.
604    */
605   public static final class Builder<E> {
606     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
607     private static final String DEFAULT_NAME = "atomix";
608     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
609     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
610     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
611     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
612     private static final double DEFAULT_INDEX_DENSITY = .005;
613
614     private String name = DEFAULT_NAME;
615     private StorageLevel storageLevel = StorageLevel.DISK;
616     private File directory = new File(DEFAULT_DIRECTORY);
617     private JournalSerdes namespace;
618     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
619     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
620     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
621     private double indexDensity = DEFAULT_INDEX_DENSITY;
622     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
623
624     Builder() {
625       // Hidden on purpose
626     }
627
628     /**
629      * Sets the storage name.
630      *
631      * @param name The storage name.
632      * @return The storage builder.
633      */
634     public Builder<E> withName(String name) {
635       this.name = requireNonNull(name, "name cannot be null");
636       return this;
637     }
638
639     /**
640      * Sets the log storage level, returning the builder for method chaining.
641      * <p>
642      * The storage level indicates how individual entries should be persisted in the journal.
643      *
644      * @param storageLevel The log storage level.
645      * @return The storage builder.
646      */
647     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
648       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
649       return this;
650     }
651
652     /**
653      * Sets the log directory, returning the builder for method chaining.
654      * <p>
655      * The log will write segment files into the provided directory.
656      *
657      * @param directory The log directory.
658      * @return The storage builder.
659      * @throws NullPointerException If the {@code directory} is {@code null}
660      */
661     public Builder<E> withDirectory(String directory) {
662       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
663     }
664
665     /**
666      * Sets the log directory, returning the builder for method chaining.
667      * <p>
668      * The log will write segment files into the provided directory.
669      *
670      * @param directory The log directory.
671      * @return The storage builder.
672      * @throws NullPointerException If the {@code directory} is {@code null}
673      */
674     public Builder<E> withDirectory(File directory) {
675       this.directory = requireNonNull(directory, "directory cannot be null");
676       return this;
677     }
678
679     /**
680      * Sets the journal namespace, returning the builder for method chaining.
681      *
682      * @param namespace The journal serializer.
683      * @return The journal builder.
684      */
685     public Builder<E> withNamespace(JournalSerdes namespace) {
686       this.namespace = requireNonNull(namespace, "namespace cannot be null");
687       return this;
688     }
689
690     /**
691      * Sets the maximum segment size in bytes, returning the builder for method chaining.
692      * <p>
693      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
694      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
695      * segment and append new entries to that segment.
696      * <p>
697      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
698      *
699      * @param maxSegmentSize The maximum segment size in bytes.
700      * @return The storage builder.
701      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
702      */
703     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
704       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
705           "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
706       this.maxSegmentSize = maxSegmentSize;
707       return this;
708     }
709
710     /**
711      * Sets the maximum entry size in bytes, returning the builder for method chaining.
712      *
713      * @param maxEntrySize the maximum entry size in bytes
714      * @return the storage builder
715      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
716      */
717     public Builder<E> withMaxEntrySize(int maxEntrySize) {
718       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
719       this.maxEntrySize = maxEntrySize;
720       return this;
721     }
722
723     /**
724      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
725      * <p>
726      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
727      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
728      * new segment and append new entries to that segment.
729      * <p>
730      * By default, the maximum entries per segment is {@code 1024 * 1024}.
731      *
732      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
733      * @return The storage builder.
734      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
735      *     per segment
736      * @deprecated since 3.0.2
737      */
738     @Deprecated
739     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
740       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
741       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
742           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
743       this.maxEntriesPerSegment = maxEntriesPerSegment;
744       return this;
745     }
746
747     /**
748      * Sets the journal index density.
749      * <p>
750      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
751      * in-memory index for faster seeking.
752      *
753      * @param indexDensity the index density
754      * @return the journal builder
755      * @throws IllegalArgumentException if the density is not between 0 and 1
756      */
757     public Builder<E> withIndexDensity(double indexDensity) {
758       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
759       this.indexDensity = indexDensity;
760       return this;
761     }
762
763     /**
764      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
765      * chaining.
766      * <p>
767      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
768      * committed in a given segment.
769      *
770      * @return The storage builder.
771      */
772     public Builder<E> withFlushOnCommit() {
773       return withFlushOnCommit(true);
774     }
775
776     /**
777      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
778      * chaining.
779      * <p>
780      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
781      * committed in a given segment.
782      *
783      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
784      * @return The storage builder.
785      */
786     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
787       this.flushOnCommit = flushOnCommit;
788       return this;
789     }
790
791     /**
792      * Build the {@link SegmentedJournal}.
793      *
794      * @return A new {@link SegmentedJournal}.
795      */
796     public SegmentedJournal<E> build() {
797       return new SegmentedJournal<>(
798           name,
799           storageLevel,
800           directory,
801           namespace,
802           maxSegmentSize,
803           maxEntrySize,
804           maxEntriesPerSegment,
805           indexDensity,
806           flushOnCommit);
807     }
808   }
809 }