Retain RandomAccessFile in JournalSegmentFile
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.Map;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentNavigableMap;
29 import java.util.concurrent.ConcurrentSkipListMap;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Segmented journal.
35  */
36 public final class SegmentedJournal<E> implements Journal<E> {
37   /**
38    * Returns a new Raft log builder.
39    *
40    * @return A new Raft log builder.
41    */
42   public static <E> Builder<E> builder() {
43     return new Builder<>();
44   }
45
46   private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
47   private static final int SEGMENT_BUFFER_FACTOR = 3;
48
49   private final String name;
50   private final StorageLevel storageLevel;
51   private final File directory;
52   private final JournalSerializer<E> serializer;
53   private final int maxSegmentSize;
54   private final int maxEntrySize;
55   private final int maxEntriesPerSegment;
56   private final double indexDensity;
57   private final boolean flushOnCommit;
58   private final SegmentedJournalWriter<E> writer;
59   private volatile long commitIndex;
60
61   private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
62   private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
63   private JournalSegment currentSegment;
64
65   private volatile boolean open = true;
66
67   public SegmentedJournal(
68       String name,
69       StorageLevel storageLevel,
70       File directory,
71       JournalSerdes namespace,
72       int maxSegmentSize,
73       int maxEntrySize,
74       int maxEntriesPerSegment,
75       double indexDensity,
76       boolean flushOnCommit) {
77     this.name = requireNonNull(name, "name cannot be null");
78     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
79     this.directory = requireNonNull(directory, "directory cannot be null");
80     this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
81     this.maxSegmentSize = maxSegmentSize;
82     this.maxEntrySize = maxEntrySize;
83     this.maxEntriesPerSegment = maxEntriesPerSegment;
84     this.indexDensity = indexDensity;
85     this.flushOnCommit = flushOnCommit;
86     open();
87     this.writer = new SegmentedJournalWriter<>(this);
88   }
89
90   /**
91    * Returns the segment file name prefix.
92    *
93    * @return The segment file name prefix.
94    */
95   public String name() {
96     return name;
97   }
98
99   /**
100    * Returns the storage directory.
101    * <p>
102    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
103    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
104    * when the log is opened.
105    *
106    * @return The storage directory.
107    */
108   public File directory() {
109     return directory;
110   }
111
112   /**
113    * Returns the storage level.
114    * <p>
115    * The storage level dictates how entries within individual journal segments should be stored.
116    *
117    * @return The storage level.
118    */
119   public StorageLevel storageLevel() {
120     return storageLevel;
121   }
122
123   /**
124    * Returns the maximum journal segment size.
125    * <p>
126    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
127    *
128    * @return The maximum segment size in bytes.
129    */
130   public int maxSegmentSize() {
131     return maxSegmentSize;
132   }
133
134   /**
135    * Returns the maximum journal entry size.
136    * <p>
137    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
138    *
139    * @return the maximum entry size in bytes
140    */
141   public int maxEntrySize() {
142     return maxEntrySize;
143   }
144
145   /**
146    * Returns the maximum number of entries per segment.
147    * <p>
148    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
149    * in a journal.
150    *
151    * @return The maximum number of entries per segment.
152    * @deprecated since 3.0.2
153    */
154   @Deprecated
155   public int maxEntriesPerSegment() {
156     return maxEntriesPerSegment;
157   }
158
159   /**
160    * Returns the collection of journal segments.
161    *
162    * @return the collection of journal segments
163    */
164   public Collection<JournalSegment> segments() {
165     return segments.values();
166   }
167
168   /**
169    * Returns the collection of journal segments with indexes greater than the given index.
170    *
171    * @param index the starting index
172    * @return the journal segments starting with indexes greater than or equal to the given index
173    */
174   public Collection<JournalSegment> segments(long index) {
175     return segments.tailMap(index).values();
176   }
177
178   /**
179    * Returns serializer instance.
180    *
181    * @return serializer instance
182    */
183   JournalSerializer<E> serializer() {
184     return serializer;
185   }
186
187   /**
188    * Returns the total size of the journal.
189    *
190    * @return the total size of the journal
191    */
192   public long size() {
193     return segments.values().stream()
194         .mapToLong(segment -> {
195           try {
196             return segment.file().size();
197           } catch (IOException e) {
198             throw new StorageException(e);
199           }
200         })
201         .sum();
202   }
203
204   @Override
205   public JournalWriter<E> writer() {
206     return writer;
207   }
208
209   @Override
210   public JournalReader<E> openReader(long index) {
211     return openReader(index, JournalReader.Mode.ALL);
212   }
213
214   /**
215    * Opens a new Raft log reader with the given reader mode.
216    *
217    * @param index The index from which to begin reading entries.
218    * @param mode The mode in which to read entries.
219    * @return The Raft log reader.
220    */
221   @Override
222   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
223     final var segment = getSegment(index);
224     final var reader = switch (mode) {
225       case ALL -> new SegmentedJournalReader<>(this, segment);
226       case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
227     };
228
229     // Forward reader to specified index
230     long next = reader.getNextIndex();
231     while (index > next && reader.tryAdvance()) {
232         next = reader.getNextIndex();
233     }
234
235     readers.add(reader);
236     return reader;
237   }
238
239   /**
240    * Opens the segments.
241    */
242   private synchronized void open() {
243     // Load existing log segments from disk.
244     for (var segment : loadSegments()) {
245       segments.put(segment.firstIndex(), segment);
246     }
247
248     // If a segment doesn't already exist, create an initial segment starting at index 1.
249     if (!segments.isEmpty()) {
250       currentSegment = segments.lastEntry().getValue();
251     } else {
252       currentSegment = createSegment(1, 1);
253       segments.put(1L, currentSegment);
254     }
255   }
256
257   /**
258    * Asserts that the manager is open.
259    *
260    * @throws IllegalStateException if the segment manager is not open
261    */
262   private void assertOpen() {
263     checkState(currentSegment != null, "journal not open");
264   }
265
266   /**
267    * Asserts that enough disk space is available to allocate a new segment.
268    */
269   private void assertDiskSpace() {
270     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
271       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
272     }
273   }
274
275   /**
276    * Resets the current segment, creating a new segment if necessary.
277    */
278   private synchronized void resetCurrentSegment() {
279     final var lastSegment = getLastSegment();
280     if (lastSegment == null) {
281       currentSegment = createSegment(1, 1);
282       segments.put(1L, currentSegment);
283     } else {
284       currentSegment = lastSegment;
285     }
286   }
287
288   /**
289    * Resets and returns the first segment in the journal.
290    *
291    * @param index the starting index of the journal
292    * @return the first segment
293    */
294   JournalSegment resetSegments(long index) {
295     assertOpen();
296
297     // If the index already equals the first segment index, skip the reset.
298     final var firstSegment = getFirstSegment();
299     if (index == firstSegment.firstIndex()) {
300       return firstSegment;
301     }
302
303     segments.values().forEach(JournalSegment::delete);
304     segments.clear();
305
306     currentSegment = createSegment(1, index);
307     segments.put(index, currentSegment);
308     return currentSegment;
309   }
310
311   /**
312    * Returns the first segment in the log.
313    *
314    * @throws IllegalStateException if the segment manager is not open
315    */
316   JournalSegment getFirstSegment() {
317     assertOpen();
318     Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
319     return segment != null ? segment.getValue() : null;
320   }
321
322   /**
323    * Returns the last segment in the log.
324    *
325    * @throws IllegalStateException if the segment manager is not open
326    */
327   JournalSegment getLastSegment() {
328     assertOpen();
329     Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
330     return segment != null ? segment.getValue() : null;
331   }
332
333   /**
334    * Creates and returns the next segment.
335    *
336    * @return The next segment.
337    * @throws IllegalStateException if the segment manager is not open
338    */
339   synchronized JournalSegment getNextSegment() {
340     assertOpen();
341     assertDiskSpace();
342
343     final var index = currentSegment.lastIndex() + 1;
344     final var lastSegment = getLastSegment();
345     currentSegment = createSegment(lastSegment != null ? lastSegment.file().descriptor().id() + 1 : 1, index);
346     segments.put(index, currentSegment);
347     return currentSegment;
348   }
349
350   /**
351    * Returns the segment following the segment with the given ID.
352    *
353    * @param index The segment index with which to look up the next segment.
354    * @return The next segment for the given index.
355    */
356   JournalSegment getNextSegment(long index) {
357     Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
358     return nextSegment != null ? nextSegment.getValue() : null;
359   }
360
361   /**
362    * Returns the segment for the given index.
363    *
364    * @param index The index for which to return the segment.
365    * @throws IllegalStateException if the segment manager is not open
366    */
367   synchronized JournalSegment getSegment(long index) {
368     assertOpen();
369     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
370     if (currentSegment != null && index > currentSegment.firstIndex()) {
371       return currentSegment;
372     }
373
374     // If the index is in another segment, get the entry with the next lowest first index.
375     Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
376     if (segment != null) {
377       return segment.getValue();
378     }
379     return getFirstSegment();
380   }
381
382   /**
383    * Removes a segment.
384    *
385    * @param segment The segment to remove.
386    */
387   synchronized void removeSegment(JournalSegment segment) {
388     segments.remove(segment.firstIndex());
389     segment.delete();
390     resetCurrentSegment();
391   }
392
393   /**
394    * Creates a new segment.
395    */
396   JournalSegment createSegment(long id, long index) {
397     final JournalSegmentFile file;
398     try {
399       file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
400           .withId(id)
401           .withIndex(index)
402           .withMaxSegmentSize(maxSegmentSize)
403           .withMaxEntries(maxEntriesPerSegment)
404           .withUpdated(System.currentTimeMillis())
405           .build());
406     } catch (IOException e) {
407       throw new StorageException(e);
408     }
409
410     final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
411     LOG.debug("Created segment: {}", segment);
412     return segment;
413   }
414
415   /**
416    * Loads all segments from disk.
417    *
418    * @return A collection of segments for the log.
419    */
420   protected Collection<JournalSegment> loadSegments() {
421     // Ensure log directories are created.
422     directory.mkdirs();
423
424     final var segments = new TreeMap<Long, JournalSegment>();
425
426     // Iterate through all files in the log directory.
427     for (var file : directory.listFiles(File::isFile)) {
428
429       // If the file looks like a segment file, attempt to load the segment.
430       if (JournalSegmentFile.isSegmentFile(name, file)) {
431         final JournalSegmentFile segmentFile;
432         try {
433           segmentFile = JournalSegmentFile.openExisting(file.toPath());
434         } catch (IOException e) {
435           throw new StorageException(e);
436         }
437
438         // Load the segment.
439         LOG.debug("Loaded disk segment: {} ({})", segmentFile.descriptor().id(), segmentFile.path());
440
441         // Add the segment to the segments list.
442         final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
443         segments.put(segment.firstIndex(), segment);
444       }
445     }
446
447     // Verify that all the segments in the log align with one another.
448     JournalSegment previousSegment = null;
449     boolean corrupted = false;
450     final var iterator = segments.entrySet().iterator();
451     while (iterator.hasNext()) {
452       final var segment = iterator.next().getValue();
453       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
454         LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
455             previousSegment.file().path());
456         corrupted = true;
457       }
458       if (corrupted) {
459         segment.delete();
460         iterator.remove();
461       }
462       previousSegment = segment;
463     }
464
465     return segments.values();
466   }
467
468   /**
469    * Resets journal readers to the given head.
470    *
471    * @param index The index at which to reset readers.
472    */
473   void resetHead(long index) {
474     for (var reader : readers) {
475       if (reader.getNextIndex() < index) {
476         reader.reset(index);
477       }
478     }
479   }
480
481   /**
482    * Resets journal readers to the given tail.
483    *
484    * @param index The index at which to reset readers.
485    */
486   void resetTail(long index) {
487     for (var reader : readers) {
488       if (reader.getNextIndex() >= index) {
489         reader.reset(index);
490       }
491     }
492   }
493
494   void closeReader(SegmentedJournalReader<E> reader) {
495     readers.remove(reader);
496   }
497
498   @Override
499   public boolean isOpen() {
500     return open;
501   }
502
503   /**
504    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
505    *
506    * @param index the index from which to remove segments
507    * @return indicates whether a segment can be removed from the journal
508    */
509   public boolean isCompactable(long index) {
510     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
511     return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
512   }
513
514   /**
515    * Returns the index of the last segment in the log.
516    *
517    * @param index the compaction index
518    * @return the starting index of the last segment in the log
519    */
520   public long getCompactableIndex(long index) {
521     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
522     return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
523   }
524
525   /**
526    * Compacts the journal up to the given index.
527    * <p>
528    * The semantics of compaction are not specified by this interface.
529    *
530    * @param index The index up to which to compact the journal.
531    */
532   public void compact(long index) {
533     final var segmentEntry = segments.floorEntry(index);
534     if (segmentEntry != null) {
535       final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
536       if (!compactSegments.isEmpty()) {
537         LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
538         compactSegments.values().forEach(JournalSegment::delete);
539         compactSegments.clear();
540         resetHead(segmentEntry.getValue().firstIndex());
541       }
542     }
543   }
544
545   @Override
546   public void close() {
547     segments.values().forEach(JournalSegment::close);
548     currentSegment = null;
549     open = false;
550   }
551
552   /**
553    * Returns whether {@code flushOnCommit} is enabled for the log.
554    *
555    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
556    */
557   boolean isFlushOnCommit() {
558     return flushOnCommit;
559   }
560
561   /**
562    * Commits entries up to the given index.
563    *
564    * @param index The index up to which to commit entries.
565    */
566   void setCommitIndex(long index) {
567     this.commitIndex = index;
568   }
569
570   /**
571    * Returns the Raft log commit index.
572    *
573    * @return The Raft log commit index.
574    */
575   long getCommitIndex() {
576     return commitIndex;
577   }
578
579   /**
580    * Raft log builder.
581    */
582   public static final class Builder<E> {
583     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
584     private static final String DEFAULT_NAME = "atomix";
585     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
586     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
587     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
588     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
589     private static final double DEFAULT_INDEX_DENSITY = .005;
590
591     private String name = DEFAULT_NAME;
592     private StorageLevel storageLevel = StorageLevel.DISK;
593     private File directory = new File(DEFAULT_DIRECTORY);
594     private JournalSerdes namespace;
595     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
596     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
597     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
598     private double indexDensity = DEFAULT_INDEX_DENSITY;
599     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
600
601     Builder() {
602       // Hidden on purpose
603     }
604
605     /**
606      * Sets the storage name.
607      *
608      * @param name The storage name.
609      * @return The storage builder.
610      */
611     public Builder<E> withName(String name) {
612       this.name = requireNonNull(name, "name cannot be null");
613       return this;
614     }
615
616     /**
617      * Sets the log storage level, returning the builder for method chaining.
618      * <p>
619      * The storage level indicates how individual entries should be persisted in the journal.
620      *
621      * @param storageLevel The log storage level.
622      * @return The storage builder.
623      */
624     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
625       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
626       return this;
627     }
628
629     /**
630      * Sets the log directory, returning the builder for method chaining.
631      * <p>
632      * The log will write segment files into the provided directory.
633      *
634      * @param directory The log directory.
635      * @return The storage builder.
636      * @throws NullPointerException If the {@code directory} is {@code null}
637      */
638     public Builder<E> withDirectory(String directory) {
639       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
640     }
641
642     /**
643      * Sets the log directory, returning the builder for method chaining.
644      * <p>
645      * The log will write segment files into the provided directory.
646      *
647      * @param directory The log directory.
648      * @return The storage builder.
649      * @throws NullPointerException If the {@code directory} is {@code null}
650      */
651     public Builder<E> withDirectory(File directory) {
652       this.directory = requireNonNull(directory, "directory cannot be null");
653       return this;
654     }
655
656     /**
657      * Sets the journal namespace, returning the builder for method chaining.
658      *
659      * @param namespace The journal serializer.
660      * @return The journal builder.
661      */
662     public Builder<E> withNamespace(JournalSerdes namespace) {
663       this.namespace = requireNonNull(namespace, "namespace cannot be null");
664       return this;
665     }
666
667     /**
668      * Sets the maximum segment size in bytes, returning the builder for method chaining.
669      * <p>
670      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
671      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
672      * segment and append new entries to that segment.
673      * <p>
674      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
675      *
676      * @param maxSegmentSize The maximum segment size in bytes.
677      * @return The storage builder.
678      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
679      */
680     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
681       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
682           "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
683       this.maxSegmentSize = maxSegmentSize;
684       return this;
685     }
686
687     /**
688      * Sets the maximum entry size in bytes, returning the builder for method chaining.
689      *
690      * @param maxEntrySize the maximum entry size in bytes
691      * @return the storage builder
692      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
693      */
694     public Builder<E> withMaxEntrySize(int maxEntrySize) {
695       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
696       this.maxEntrySize = maxEntrySize;
697       return this;
698     }
699
700     /**
701      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
702      * <p>
703      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
704      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
705      * new segment and append new entries to that segment.
706      * <p>
707      * By default, the maximum entries per segment is {@code 1024 * 1024}.
708      *
709      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
710      * @return The storage builder.
711      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
712      *     per segment
713      * @deprecated since 3.0.2
714      */
715     @Deprecated
716     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
717       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
718       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
719           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
720       this.maxEntriesPerSegment = maxEntriesPerSegment;
721       return this;
722     }
723
724     /**
725      * Sets the journal index density.
726      * <p>
727      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
728      * in-memory index for faster seeking.
729      *
730      * @param indexDensity the index density
731      * @return the journal builder
732      * @throws IllegalArgumentException if the density is not between 0 and 1
733      */
734     public Builder<E> withIndexDensity(double indexDensity) {
735       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
736       this.indexDensity = indexDensity;
737       return this;
738     }
739
740     /**
741      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
742      * chaining.
743      * <p>
744      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
745      * committed in a given segment.
746      *
747      * @return The storage builder.
748      */
749     public Builder<E> withFlushOnCommit() {
750       return withFlushOnCommit(true);
751     }
752
753     /**
754      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
755      * chaining.
756      * <p>
757      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
758      * committed in a given segment.
759      *
760      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
761      * @return The storage builder.
762      */
763     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
764       this.flushOnCommit = flushOnCommit;
765       return this;
766     }
767
768     /**
769      * Build the {@link SegmentedJournal}.
770      *
771      * @return A new {@link SegmentedJournal}.
772      */
773     public SegmentedJournal<E> build() {
774       return new SegmentedJournal<>(
775           name,
776           storageLevel,
777           directory,
778           namespace,
779           maxSegmentSize,
780           maxEntrySize,
781           maxEntriesPerSegment,
782           indexDensity,
783           flushOnCommit);
784     }
785   }
786 }