Refactor JournalReader.tryNext()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
27 import java.nio.file.StandardOpenOption;
28 import java.util.Collection;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.TreeMap;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentNavigableMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Segmented journal.
40  */
41 public final class SegmentedJournal<E> implements Journal<E> {
42   /**
43    * Returns a new Raft log builder.
44    *
45    * @return A new Raft log builder.
46    */
47   public static <E> Builder<E> builder() {
48     return new Builder<>();
49   }
50
51   private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
52   private static final int SEGMENT_BUFFER_FACTOR = 3;
53
54   private final String name;
55   private final StorageLevel storageLevel;
56   private final File directory;
57   private final JournalSerializer<E> serializer;
58   private final int maxSegmentSize;
59   private final int maxEntrySize;
60   private final int maxEntriesPerSegment;
61   private final double indexDensity;
62   private final boolean flushOnCommit;
63   private final SegmentedJournalWriter<E> writer;
64   private volatile long commitIndex;
65
66   private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
67   private final Collection<SegmentedJournalReader> readers = ConcurrentHashMap.newKeySet();
68   private JournalSegment currentSegment;
69
70   private volatile boolean open = true;
71
72   public SegmentedJournal(
73       String name,
74       StorageLevel storageLevel,
75       File directory,
76       JournalSerdes namespace,
77       int maxSegmentSize,
78       int maxEntrySize,
79       int maxEntriesPerSegment,
80       double indexDensity,
81       boolean flushOnCommit) {
82     this.name = requireNonNull(name, "name cannot be null");
83     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
84     this.directory = requireNonNull(directory, "directory cannot be null");
85     this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
86     this.maxSegmentSize = maxSegmentSize;
87     this.maxEntrySize = maxEntrySize;
88     this.maxEntriesPerSegment = maxEntriesPerSegment;
89     this.indexDensity = indexDensity;
90     this.flushOnCommit = flushOnCommit;
91     open();
92     this.writer = new SegmentedJournalWriter<>(this);
93   }
94
95   /**
96    * Returns the segment file name prefix.
97    *
98    * @return The segment file name prefix.
99    */
100   public String name() {
101     return name;
102   }
103
104   /**
105    * Returns the storage directory.
106    * <p>
107    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
108    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
109    * when the log is opened.
110    *
111    * @return The storage directory.
112    */
113   public File directory() {
114     return directory;
115   }
116
117   /**
118    * Returns the storage level.
119    * <p>
120    * The storage level dictates how entries within individual journal segments should be stored.
121    *
122    * @return The storage level.
123    */
124   public StorageLevel storageLevel() {
125     return storageLevel;
126   }
127
128   /**
129    * Returns the maximum journal segment size.
130    * <p>
131    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
132    *
133    * @return The maximum segment size in bytes.
134    */
135   public int maxSegmentSize() {
136     return maxSegmentSize;
137   }
138
139   /**
140    * Returns the maximum journal entry size.
141    * <p>
142    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
143    *
144    * @return the maximum entry size in bytes
145    */
146   public int maxEntrySize() {
147     return maxEntrySize;
148   }
149
150   /**
151    * Returns the maximum number of entries per segment.
152    * <p>
153    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
154    * in a journal.
155    *
156    * @return The maximum number of entries per segment.
157    * @deprecated since 3.0.2
158    */
159   @Deprecated
160   public int maxEntriesPerSegment() {
161     return maxEntriesPerSegment;
162   }
163
164   /**
165    * Returns the collection of journal segments.
166    *
167    * @return the collection of journal segments
168    */
169   public Collection<JournalSegment> segments() {
170     return segments.values();
171   }
172
173   /**
174    * Returns the collection of journal segments with indexes greater than the given index.
175    *
176    * @param index the starting index
177    * @return the journal segments starting with indexes greater than or equal to the given index
178    */
179   public Collection<JournalSegment> segments(long index) {
180     return segments.tailMap(index).values();
181   }
182
183   /**
184    * Returns serializer instance.
185    *
186    * @return serializer instance
187    */
188   JournalSerializer<E> serializer() {
189     return serializer;
190   }
191
192   /**
193    * Returns the total size of the journal.
194    *
195    * @return the total size of the journal
196    */
197   public long size() {
198     return segments.values().stream()
199         .mapToLong(segment -> segment.size())
200         .sum();
201   }
202
203   @Override
204   public JournalWriter<E> writer() {
205     return writer;
206   }
207
208   @Override
209   public JournalReader<E> openReader(long index) {
210     return openReader(index, JournalReader.Mode.ALL);
211   }
212
213   /**
214    * Opens a new Raft log reader with the given reader mode.
215    *
216    * @param index The index from which to begin reading entries.
217    * @param mode The mode in which to read entries.
218    * @return The Raft log reader.
219    */
220   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
221     final var segment = getSegment(index);
222     final var reader = switch (mode) {
223       case ALL -> new SegmentedJournalReader<>(this, segment);
224       case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
225     };
226
227     // Forward reader to specified index
228     long next = reader.getNextIndex();
229     while (index > next && reader.tryAdvance()) {
230         next = reader.getNextIndex();
231     }
232
233     readers.add(reader);
234     return reader;
235   }
236
237   /**
238    * Opens the segments.
239    */
240   private synchronized void open() {
241     // Load existing log segments from disk.
242     for (JournalSegment segment : loadSegments()) {
243       segments.put(segment.descriptor().index(), segment);
244     }
245
246     // If a segment doesn't already exist, create an initial segment starting at index 1.
247     if (!segments.isEmpty()) {
248       currentSegment = segments.lastEntry().getValue();
249     } else {
250       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
251           .withId(1)
252           .withIndex(1)
253           .withMaxSegmentSize(maxSegmentSize)
254           .withMaxEntries(maxEntriesPerSegment)
255           .build();
256
257       currentSegment = createSegment(descriptor);
258       currentSegment.descriptor().update(System.currentTimeMillis());
259
260       segments.put(1L, currentSegment);
261     }
262   }
263
264   /**
265    * Asserts that the manager is open.
266    *
267    * @throws IllegalStateException if the segment manager is not open
268    */
269   private void assertOpen() {
270     checkState(currentSegment != null, "journal not open");
271   }
272
273   /**
274    * Asserts that enough disk space is available to allocate a new segment.
275    */
276   private void assertDiskSpace() {
277     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
278       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
279     }
280   }
281
282   /**
283    * Resets the current segment, creating a new segment if necessary.
284    */
285   private synchronized void resetCurrentSegment() {
286     JournalSegment lastSegment = getLastSegment();
287     if (lastSegment != null) {
288       currentSegment = lastSegment;
289     } else {
290       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
291           .withId(1)
292           .withIndex(1)
293           .withMaxSegmentSize(maxSegmentSize)
294           .withMaxEntries(maxEntriesPerSegment)
295           .build();
296
297       currentSegment = createSegment(descriptor);
298
299       segments.put(1L, currentSegment);
300     }
301   }
302
303   /**
304    * Resets and returns the first segment in the journal.
305    *
306    * @param index the starting index of the journal
307    * @return the first segment
308    */
309   JournalSegment resetSegments(long index) {
310     assertOpen();
311
312     // If the index already equals the first segment index, skip the reset.
313     JournalSegment firstSegment = getFirstSegment();
314     if (index == firstSegment.firstIndex()) {
315       return firstSegment;
316     }
317
318     for (JournalSegment segment : segments.values()) {
319       segment.close();
320       segment.delete();
321     }
322     segments.clear();
323
324     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
325         .withId(1)
326         .withIndex(index)
327         .withMaxSegmentSize(maxSegmentSize)
328         .withMaxEntries(maxEntriesPerSegment)
329         .build();
330     currentSegment = createSegment(descriptor);
331     segments.put(index, currentSegment);
332     return currentSegment;
333   }
334
335   /**
336    * Returns the first segment in the log.
337    *
338    * @throws IllegalStateException if the segment manager is not open
339    */
340   JournalSegment getFirstSegment() {
341     assertOpen();
342     Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
343     return segment != null ? segment.getValue() : null;
344   }
345
346   /**
347    * Returns the last segment in the log.
348    *
349    * @throws IllegalStateException if the segment manager is not open
350    */
351   JournalSegment getLastSegment() {
352     assertOpen();
353     Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
354     return segment != null ? segment.getValue() : null;
355   }
356
357   /**
358    * Creates and returns the next segment.
359    *
360    * @return The next segment.
361    * @throws IllegalStateException if the segment manager is not open
362    */
363   synchronized JournalSegment getNextSegment() {
364     assertOpen();
365     assertDiskSpace();
366
367     JournalSegment lastSegment = getLastSegment();
368     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
369         .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
370         .withIndex(currentSegment.lastIndex() + 1)
371         .withMaxSegmentSize(maxSegmentSize)
372         .withMaxEntries(maxEntriesPerSegment)
373         .build();
374
375     currentSegment = createSegment(descriptor);
376
377     segments.put(descriptor.index(), currentSegment);
378     return currentSegment;
379   }
380
381   /**
382    * Returns the segment following the segment with the given ID.
383    *
384    * @param index The segment index with which to look up the next segment.
385    * @return The next segment for the given index.
386    */
387   JournalSegment getNextSegment(long index) {
388     Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
389     return nextSegment != null ? nextSegment.getValue() : null;
390   }
391
392   /**
393    * Returns the segment for the given index.
394    *
395    * @param index The index for which to return the segment.
396    * @throws IllegalStateException if the segment manager is not open
397    */
398   synchronized JournalSegment getSegment(long index) {
399     assertOpen();
400     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
401     if (currentSegment != null && index > currentSegment.firstIndex()) {
402       return currentSegment;
403     }
404
405     // If the index is in another segment, get the entry with the next lowest first index.
406     Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
407     if (segment != null) {
408       return segment.getValue();
409     }
410     return getFirstSegment();
411   }
412
413   /**
414    * Removes a segment.
415    *
416    * @param segment The segment to remove.
417    */
418   synchronized void removeSegment(JournalSegment segment) {
419     segments.remove(segment.firstIndex());
420     segment.close();
421     segment.delete();
422     resetCurrentSegment();
423   }
424
425   /**
426    * Creates a new segment.
427    */
428   JournalSegment createSegment(JournalSegmentDescriptor descriptor) {
429     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
430
431     RandomAccessFile raf;
432     FileChannel channel;
433     try {
434       raf = new RandomAccessFile(segmentFile, "rw");
435       raf.setLength(descriptor.maxSegmentSize());
436       channel =  raf.getChannel();
437     } catch (IOException e) {
438       throw new StorageException(e);
439     }
440
441     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
442     descriptor.copyTo(buffer);
443     buffer.flip();
444     try {
445       channel.write(buffer);
446     } catch (IOException e) {
447       throw new StorageException(e);
448     } finally {
449       try {
450         channel.close();
451         raf.close();
452       } catch (IOException e) {
453       }
454     }
455     JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
456     LOG.debug("Created segment: {}", segment);
457     return segment;
458   }
459
460   /**
461    * Creates a new segment instance.
462    *
463    * @param segmentFile The segment file.
464    * @param descriptor The segment descriptor.
465    * @return The segment instance.
466    */
467   protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
468     return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
469   }
470
471   /**
472    * Loads a segment.
473    */
474   private JournalSegment loadSegment(long segmentId) {
475     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
476     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
477     try (FileChannel channel = openChannel(segmentFile)) {
478       channel.read(buffer);
479       buffer.flip();
480       JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
481       JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
482       LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
483       return segment;
484     } catch (IOException e) {
485       throw new StorageException(e);
486     }
487   }
488
489   private FileChannel openChannel(File file) {
490     try {
491       return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
492     } catch (IOException e) {
493       throw new StorageException(e);
494     }
495   }
496
497   /**
498    * Loads all segments from disk.
499    *
500    * @return A collection of segments for the log.
501    */
502   protected Collection<JournalSegment> loadSegments() {
503     // Ensure log directories are created.
504     directory.mkdirs();
505
506     TreeMap<Long, JournalSegment> segments = new TreeMap<>();
507
508     // Iterate through all files in the log directory.
509     for (File file : directory.listFiles(File::isFile)) {
510
511       // If the file looks like a segment file, attempt to load the segment.
512       if (JournalSegmentFile.isSegmentFile(name, file)) {
513         JournalSegmentFile segmentFile = new JournalSegmentFile(file);
514         ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
515         try (FileChannel channel = openChannel(file)) {
516           channel.read(buffer);
517           buffer.flip();
518         } catch (IOException e) {
519           throw new StorageException(e);
520         }
521
522         JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
523
524         // Load the segment.
525         JournalSegment segment = loadSegment(descriptor.id());
526
527         // Add the segment to the segments list.
528         LOG.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
529         segments.put(segment.firstIndex(), segment);
530       }
531     }
532
533     // Verify that all the segments in the log align with one another.
534     JournalSegment previousSegment = null;
535     boolean corrupted = false;
536     Iterator<Map.Entry<Long, JournalSegment>> iterator = segments.entrySet().iterator();
537     while (iterator.hasNext()) {
538       JournalSegment segment = iterator.next().getValue();
539       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
540         LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
541         corrupted = true;
542       }
543       if (corrupted) {
544         segment.close();
545         segment.delete();
546         iterator.remove();
547       }
548       previousSegment = segment;
549     }
550
551     return segments.values();
552   }
553
554   /**
555    * Resets journal readers to the given head.
556    *
557    * @param index The index at which to reset readers.
558    */
559   void resetHead(long index) {
560     for (SegmentedJournalReader<E> reader : readers) {
561       if (reader.getNextIndex() < index) {
562         reader.reset(index);
563       }
564     }
565   }
566
567   /**
568    * Resets journal readers to the given tail.
569    *
570    * @param index The index at which to reset readers.
571    */
572   void resetTail(long index) {
573     for (SegmentedJournalReader<E> reader : readers) {
574       if (reader.getNextIndex() >= index) {
575         reader.reset(index);
576       }
577     }
578   }
579
580   void closeReader(SegmentedJournalReader<E> reader) {
581     readers.remove(reader);
582   }
583
584   @Override
585   public boolean isOpen() {
586     return open;
587   }
588
589   /**
590    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
591    *
592    * @param index the index from which to remove segments
593    * @return indicates whether a segment can be removed from the journal
594    */
595   public boolean isCompactable(long index) {
596     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
597     return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
598   }
599
600   /**
601    * Returns the index of the last segment in the log.
602    *
603    * @param index the compaction index
604    * @return the starting index of the last segment in the log
605    */
606   public long getCompactableIndex(long index) {
607     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
608     return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
609   }
610
611   /**
612    * Compacts the journal up to the given index.
613    * <p>
614    * The semantics of compaction are not specified by this interface.
615    *
616    * @param index The index up to which to compact the journal.
617    */
618   public void compact(long index) {
619     final var segmentEntry = segments.floorEntry(index);
620     if (segmentEntry != null) {
621       final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
622       if (!compactSegments.isEmpty()) {
623         LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
624         for (JournalSegment segment : compactSegments.values()) {
625           LOG.trace("Deleting segment: {}", segment);
626           segment.close();
627           segment.delete();
628         }
629         compactSegments.clear();
630         resetHead(segmentEntry.getValue().firstIndex());
631       }
632     }
633   }
634
635   @Override
636   public void close() {
637     segments.values().forEach(segment -> {
638       LOG.debug("Closing segment: {}", segment);
639       segment.close();
640     });
641     currentSegment = null;
642     open = false;
643   }
644
645   /**
646    * Returns whether {@code flushOnCommit} is enabled for the log.
647    *
648    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
649    */
650   boolean isFlushOnCommit() {
651     return flushOnCommit;
652   }
653
654   /**
655    * Commits entries up to the given index.
656    *
657    * @param index The index up to which to commit entries.
658    */
659   void setCommitIndex(long index) {
660     this.commitIndex = index;
661   }
662
663   /**
664    * Returns the Raft log commit index.
665    *
666    * @return The Raft log commit index.
667    */
668   long getCommitIndex() {
669     return commitIndex;
670   }
671
672   /**
673    * Raft log builder.
674    */
675   public static final class Builder<E> {
676     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
677     private static final String DEFAULT_NAME = "atomix";
678     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
679     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
680     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
681     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
682     private static final double DEFAULT_INDEX_DENSITY = .005;
683
684     private String name = DEFAULT_NAME;
685     private StorageLevel storageLevel = StorageLevel.DISK;
686     private File directory = new File(DEFAULT_DIRECTORY);
687     private JournalSerdes namespace;
688     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
689     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
690     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
691     private double indexDensity = DEFAULT_INDEX_DENSITY;
692     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
693
694     protected Builder() {
695     }
696
697     /**
698      * Sets the storage name.
699      *
700      * @param name The storage name.
701      * @return The storage builder.
702      */
703     public Builder<E> withName(String name) {
704       this.name = requireNonNull(name, "name cannot be null");
705       return this;
706     }
707
708     /**
709      * Sets the log storage level, returning the builder for method chaining.
710      * <p>
711      * The storage level indicates how individual entries should be persisted in the journal.
712      *
713      * @param storageLevel The log storage level.
714      * @return The storage builder.
715      */
716     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
717       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
718       return this;
719     }
720
721     /**
722      * Sets the log directory, returning the builder for method chaining.
723      * <p>
724      * The log will write segment files into the provided directory.
725      *
726      * @param directory The log directory.
727      * @return The storage builder.
728      * @throws NullPointerException If the {@code directory} is {@code null}
729      */
730     public Builder<E> withDirectory(String directory) {
731       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
732     }
733
734     /**
735      * Sets the log directory, returning the builder for method chaining.
736      * <p>
737      * The log will write segment files into the provided directory.
738      *
739      * @param directory The log directory.
740      * @return The storage builder.
741      * @throws NullPointerException If the {@code directory} is {@code null}
742      */
743     public Builder<E> withDirectory(File directory) {
744       this.directory = requireNonNull(directory, "directory cannot be null");
745       return this;
746     }
747
748     /**
749      * Sets the journal namespace, returning the builder for method chaining.
750      *
751      * @param namespace The journal serializer.
752      * @return The journal builder.
753      */
754     public Builder<E> withNamespace(JournalSerdes namespace) {
755       this.namespace = requireNonNull(namespace, "namespace cannot be null");
756       return this;
757     }
758
759     /**
760      * Sets the maximum segment size in bytes, returning the builder for method chaining.
761      * <p>
762      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
763      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
764      * segment and append new entries to that segment.
765      * <p>
766      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
767      *
768      * @param maxSegmentSize The maximum segment size in bytes.
769      * @return The storage builder.
770      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
771      */
772     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
773       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
774       this.maxSegmentSize = maxSegmentSize;
775       return this;
776     }
777
778     /**
779      * Sets the maximum entry size in bytes, returning the builder for method chaining.
780      *
781      * @param maxEntrySize the maximum entry size in bytes
782      * @return the storage builder
783      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
784      */
785     public Builder<E> withMaxEntrySize(int maxEntrySize) {
786       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
787       this.maxEntrySize = maxEntrySize;
788       return this;
789     }
790
791     /**
792      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
793      * <p>
794      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
795      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
796      * new segment and append new entries to that segment.
797      * <p>
798      * By default, the maximum entries per segment is {@code 1024 * 1024}.
799      *
800      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
801      * @return The storage builder.
802      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
803      *     per segment
804      * @deprecated since 3.0.2
805      */
806     @Deprecated
807     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
808       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
809       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
810           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
811       this.maxEntriesPerSegment = maxEntriesPerSegment;
812       return this;
813     }
814
815     /**
816      * Sets the journal index density.
817      * <p>
818      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
819      * in-memory index for faster seeking.
820      *
821      * @param indexDensity the index density
822      * @return the journal builder
823      * @throws IllegalArgumentException if the density is not between 0 and 1
824      */
825     public Builder<E> withIndexDensity(double indexDensity) {
826       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
827       this.indexDensity = indexDensity;
828       return this;
829     }
830
831     /**
832      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
833      * chaining.
834      * <p>
835      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
836      * committed in a given segment.
837      *
838      * @return The storage builder.
839      */
840     public Builder<E> withFlushOnCommit() {
841       return withFlushOnCommit(true);
842     }
843
844     /**
845      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
846      * chaining.
847      * <p>
848      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
849      * committed in a given segment.
850      *
851      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
852      * @return The storage builder.
853      */
854     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
855       this.flushOnCommit = flushOnCommit;
856       return this;
857     }
858
859     /**
860      * Build the {@link SegmentedJournal}.
861      *
862      * @return A new {@link SegmentedJournal}.
863      */
864     public SegmentedJournal<E> build() {
865       return new SegmentedJournal<>(
866           name,
867           storageLevel,
868           directory,
869           namespace,
870           maxSegmentSize,
871           maxEntrySize,
872           maxEntriesPerSegment,
873           indexDensity,
874           flushOnCommit);
875     }
876   }
877 }