Improve segmented journal actor metrics
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.TreeMap;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentNavigableMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import static com.google.common.base.Preconditions.checkArgument;
35 import static com.google.common.base.Preconditions.checkState;
36 import static java.util.Objects.requireNonNull;
37
38 /**
39  * Segmented journal.
40  */
41 public final class SegmentedJournal<E> implements Journal<E> {
42   /**
43    * Returns a new Raft log builder.
44    *
45    * @return A new Raft log builder.
46    */
47   public static <E> Builder<E> builder() {
48     return new Builder<>();
49   }
50
51   private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
52   private static final int SEGMENT_BUFFER_FACTOR = 3;
53
54   private final String name;
55   private final StorageLevel storageLevel;
56   private final File directory;
57   private final JournalSerdes namespace;
58   private final int maxSegmentSize;
59   private final int maxEntrySize;
60   private final int maxEntriesPerSegment;
61   private final double indexDensity;
62   private final boolean flushOnCommit;
63   private final SegmentedJournalWriter<E> writer;
64   private volatile long commitIndex;
65
66   private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
67   private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
68   private JournalSegment<E> currentSegment;
69
70   private volatile boolean open = true;
71
72   public SegmentedJournal(
73       String name,
74       StorageLevel storageLevel,
75       File directory,
76       JournalSerdes namespace,
77       int maxSegmentSize,
78       int maxEntrySize,
79       int maxEntriesPerSegment,
80       double indexDensity,
81       boolean flushOnCommit) {
82     this.name = requireNonNull(name, "name cannot be null");
83     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
84     this.directory = requireNonNull(directory, "directory cannot be null");
85     this.namespace = requireNonNull(namespace, "namespace cannot be null");
86     this.maxSegmentSize = maxSegmentSize;
87     this.maxEntrySize = maxEntrySize;
88     this.maxEntriesPerSegment = maxEntriesPerSegment;
89     this.indexDensity = indexDensity;
90     this.flushOnCommit = flushOnCommit;
91     open();
92     this.writer = new SegmentedJournalWriter<>(this);
93   }
94
95   /**
96    * Returns the segment file name prefix.
97    *
98    * @return The segment file name prefix.
99    */
100   public String name() {
101     return name;
102   }
103
104   /**
105    * Returns the storage directory.
106    * <p>
107    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
108    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
109    * when the log is opened.
110    *
111    * @return The storage directory.
112    */
113   public File directory() {
114     return directory;
115   }
116
117   /**
118    * Returns the storage level.
119    * <p>
120    * The storage level dictates how entries within individual journal segments should be stored.
121    *
122    * @return The storage level.
123    */
124   public StorageLevel storageLevel() {
125     return storageLevel;
126   }
127
128   /**
129    * Returns the maximum journal segment size.
130    * <p>
131    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
132    *
133    * @return The maximum segment size in bytes.
134    */
135   public int maxSegmentSize() {
136     return maxSegmentSize;
137   }
138
139   /**
140    * Returns the maximum journal entry size.
141    * <p>
142    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
143    *
144    * @return the maximum entry size in bytes
145    */
146   public int maxEntrySize() {
147     return maxEntrySize;
148   }
149
150   /**
151    * Returns the maximum number of entries per segment.
152    * <p>
153    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
154    * in a journal.
155    *
156    * @return The maximum number of entries per segment.
157    * @deprecated since 3.0.2
158    */
159   @Deprecated
160   public int maxEntriesPerSegment() {
161     return maxEntriesPerSegment;
162   }
163
164   /**
165    * Returns the collection of journal segments.
166    *
167    * @return the collection of journal segments
168    */
169   public Collection<JournalSegment<E>> segments() {
170     return segments.values();
171   }
172
173   /**
174    * Returns the collection of journal segments with indexes greater than the given index.
175    *
176    * @param index the starting index
177    * @return the journal segments starting with indexes greater than or equal to the given index
178    */
179   public Collection<JournalSegment<E>> segments(long index) {
180     return segments.tailMap(index).values();
181   }
182
183   /**
184    * Returns the total size of the journal.
185    *
186    * @return the total size of the journal
187    */
188   public long size() {
189     return segments.values().stream()
190         .mapToLong(segment -> segment.size())
191         .sum();
192   }
193
194   @Override
195   public JournalWriter<E> writer() {
196     return writer;
197   }
198
199   @Override
200   public JournalReader<E> openReader(long index) {
201     return openReader(index, JournalReader.Mode.ALL);
202   }
203
204   /**
205    * Opens a new Raft log reader with the given reader mode.
206    *
207    * @param index The index from which to begin reading entries.
208    * @param mode The mode in which to read entries.
209    * @return The Raft log reader.
210    */
211   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
212     final var segment = getSegment(index);
213     final var reader = switch (mode) {
214       case ALL -> new SegmentedJournalReader<>(this, segment);
215       case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
216     };
217
218     // Forward reader to specified index
219     long next = reader.getNextIndex();
220     while (index > next && reader.tryNext() != null) {
221         next = reader.getNextIndex();
222     }
223
224     readers.add(reader);
225     return reader;
226   }
227
228   /**
229    * Opens the segments.
230    */
231   private synchronized void open() {
232     // Load existing log segments from disk.
233     for (JournalSegment<E> segment : loadSegments()) {
234       segments.put(segment.descriptor().index(), segment);
235     }
236
237     // If a segment doesn't already exist, create an initial segment starting at index 1.
238     if (!segments.isEmpty()) {
239       currentSegment = segments.lastEntry().getValue();
240     } else {
241       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
242           .withId(1)
243           .withIndex(1)
244           .withMaxSegmentSize(maxSegmentSize)
245           .withMaxEntries(maxEntriesPerSegment)
246           .build();
247
248       currentSegment = createSegment(descriptor);
249       currentSegment.descriptor().update(System.currentTimeMillis());
250
251       segments.put(1L, currentSegment);
252     }
253   }
254
255   /**
256    * Asserts that the manager is open.
257    *
258    * @throws IllegalStateException if the segment manager is not open
259    */
260   private void assertOpen() {
261     checkState(currentSegment != null, "journal not open");
262   }
263
264   /**
265    * Asserts that enough disk space is available to allocate a new segment.
266    */
267   private void assertDiskSpace() {
268     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
269       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
270     }
271   }
272
273   /**
274    * Resets the current segment, creating a new segment if necessary.
275    */
276   private synchronized void resetCurrentSegment() {
277     JournalSegment<E> lastSegment = getLastSegment();
278     if (lastSegment != null) {
279       currentSegment = lastSegment;
280     } else {
281       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
282           .withId(1)
283           .withIndex(1)
284           .withMaxSegmentSize(maxSegmentSize)
285           .withMaxEntries(maxEntriesPerSegment)
286           .build();
287
288       currentSegment = createSegment(descriptor);
289
290       segments.put(1L, currentSegment);
291     }
292   }
293
294   /**
295    * Resets and returns the first segment in the journal.
296    *
297    * @param index the starting index of the journal
298    * @return the first segment
299    */
300   JournalSegment<E> resetSegments(long index) {
301     assertOpen();
302
303     // If the index already equals the first segment index, skip the reset.
304     JournalSegment<E> firstSegment = getFirstSegment();
305     if (index == firstSegment.firstIndex()) {
306       return firstSegment;
307     }
308
309     for (JournalSegment<E> segment : segments.values()) {
310       segment.close();
311       segment.delete();
312     }
313     segments.clear();
314
315     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
316         .withId(1)
317         .withIndex(index)
318         .withMaxSegmentSize(maxSegmentSize)
319         .withMaxEntries(maxEntriesPerSegment)
320         .build();
321     currentSegment = createSegment(descriptor);
322     segments.put(index, currentSegment);
323     return currentSegment;
324   }
325
326   /**
327    * Returns the first segment in the log.
328    *
329    * @throws IllegalStateException if the segment manager is not open
330    */
331   JournalSegment<E> getFirstSegment() {
332     assertOpen();
333     Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
334     return segment != null ? segment.getValue() : null;
335   }
336
337   /**
338    * Returns the last segment in the log.
339    *
340    * @throws IllegalStateException if the segment manager is not open
341    */
342   JournalSegment<E> getLastSegment() {
343     assertOpen();
344     Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
345     return segment != null ? segment.getValue() : null;
346   }
347
348   /**
349    * Creates and returns the next segment.
350    *
351    * @return The next segment.
352    * @throws IllegalStateException if the segment manager is not open
353    */
354   synchronized JournalSegment<E> getNextSegment() {
355     assertOpen();
356     assertDiskSpace();
357
358     JournalSegment<E> lastSegment = getLastSegment();
359     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
360         .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
361         .withIndex(currentSegment.lastIndex() + 1)
362         .withMaxSegmentSize(maxSegmentSize)
363         .withMaxEntries(maxEntriesPerSegment)
364         .build();
365
366     currentSegment = createSegment(descriptor);
367
368     segments.put(descriptor.index(), currentSegment);
369     return currentSegment;
370   }
371
372   /**
373    * Returns the segment following the segment with the given ID.
374    *
375    * @param index The segment index with which to look up the next segment.
376    * @return The next segment for the given index.
377    */
378   JournalSegment<E> getNextSegment(long index) {
379     Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
380     return nextSegment != null ? nextSegment.getValue() : null;
381   }
382
383   /**
384    * Returns the segment for the given index.
385    *
386    * @param index The index for which to return the segment.
387    * @throws IllegalStateException if the segment manager is not open
388    */
389   synchronized JournalSegment<E> getSegment(long index) {
390     assertOpen();
391     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
392     if (currentSegment != null && index > currentSegment.firstIndex()) {
393       return currentSegment;
394     }
395
396     // If the index is in another segment, get the entry with the next lowest first index.
397     Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
398     if (segment != null) {
399       return segment.getValue();
400     }
401     return getFirstSegment();
402   }
403
404   /**
405    * Removes a segment.
406    *
407    * @param segment The segment to remove.
408    */
409   synchronized void removeSegment(JournalSegment<E> segment) {
410     segments.remove(segment.firstIndex());
411     segment.close();
412     segment.delete();
413     resetCurrentSegment();
414   }
415
416   /**
417    * Creates a new segment.
418    */
419   JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
420     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
421
422     RandomAccessFile raf;
423     FileChannel channel;
424     try {
425       raf = new RandomAccessFile(segmentFile, "rw");
426       raf.setLength(descriptor.maxSegmentSize());
427       channel =  raf.getChannel();
428     } catch (IOException e) {
429       throw new StorageException(e);
430     }
431
432     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
433     descriptor.copyTo(buffer);
434     buffer.flip();
435     try {
436       channel.write(buffer);
437     } catch (IOException e) {
438       throw new StorageException(e);
439     } finally {
440       try {
441         channel.close();
442         raf.close();
443       } catch (IOException e) {
444       }
445     }
446     JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
447     LOG.debug("Created segment: {}", segment);
448     return segment;
449   }
450
451   /**
452    * Creates a new segment instance.
453    *
454    * @param segmentFile The segment file.
455    * @param descriptor The segment descriptor.
456    * @return The segment instance.
457    */
458   protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
459     return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
460   }
461
462   /**
463    * Loads a segment.
464    */
465   private JournalSegment<E> loadSegment(long segmentId) {
466     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
467     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
468     try (FileChannel channel = openChannel(segmentFile)) {
469       channel.read(buffer);
470       buffer.flip();
471       JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
472       JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
473       LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
474       return segment;
475     } catch (IOException e) {
476       throw new StorageException(e);
477     }
478   }
479
480   private FileChannel openChannel(File file) {
481     try {
482       return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
483     } catch (IOException e) {
484       throw new StorageException(e);
485     }
486   }
487
488   /**
489    * Loads all segments from disk.
490    *
491    * @return A collection of segments for the log.
492    */
493   protected Collection<JournalSegment<E>> loadSegments() {
494     // Ensure log directories are created.
495     directory.mkdirs();
496
497     TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
498
499     // Iterate through all files in the log directory.
500     for (File file : directory.listFiles(File::isFile)) {
501
502       // If the file looks like a segment file, attempt to load the segment.
503       if (JournalSegmentFile.isSegmentFile(name, file)) {
504         JournalSegmentFile segmentFile = new JournalSegmentFile(file);
505         ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
506         try (FileChannel channel = openChannel(file)) {
507           channel.read(buffer);
508           buffer.flip();
509         } catch (IOException e) {
510           throw new StorageException(e);
511         }
512
513         JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
514
515         // Load the segment.
516         JournalSegment<E> segment = loadSegment(descriptor.id());
517
518         // Add the segment to the segments list.
519         LOG.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
520         segments.put(segment.firstIndex(), segment);
521       }
522     }
523
524     // Verify that all the segments in the log align with one another.
525     JournalSegment<E> previousSegment = null;
526     boolean corrupted = false;
527     Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
528     while (iterator.hasNext()) {
529       JournalSegment<E> segment = iterator.next().getValue();
530       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
531         LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
532         corrupted = true;
533       }
534       if (corrupted) {
535         segment.close();
536         segment.delete();
537         iterator.remove();
538       }
539       previousSegment = segment;
540     }
541
542     return segments.values();
543   }
544
545   /**
546    * Resets journal readers to the given head.
547    *
548    * @param index The index at which to reset readers.
549    */
550   void resetHead(long index) {
551     for (SegmentedJournalReader<E> reader : readers) {
552       if (reader.getNextIndex() < index) {
553         reader.reset(index);
554       }
555     }
556   }
557
558   /**
559    * Resets journal readers to the given tail.
560    *
561    * @param index The index at which to reset readers.
562    */
563   void resetTail(long index) {
564     for (SegmentedJournalReader<E> reader : readers) {
565       if (reader.getNextIndex() >= index) {
566         reader.reset(index);
567       }
568     }
569   }
570
571   void closeReader(SegmentedJournalReader<E> reader) {
572     readers.remove(reader);
573   }
574
575   @Override
576   public boolean isOpen() {
577     return open;
578   }
579
580   /**
581    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
582    *
583    * @param index the index from which to remove segments
584    * @return indicates whether a segment can be removed from the journal
585    */
586   public boolean isCompactable(long index) {
587     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
588     return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
589   }
590
591   /**
592    * Returns the index of the last segment in the log.
593    *
594    * @param index the compaction index
595    * @return the starting index of the last segment in the log
596    */
597   public long getCompactableIndex(long index) {
598     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
599     return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
600   }
601
602   /**
603    * Compacts the journal up to the given index.
604    * <p>
605    * The semantics of compaction are not specified by this interface.
606    *
607    * @param index The index up to which to compact the journal.
608    */
609   public void compact(long index) {
610     final var segmentEntry = segments.floorEntry(index);
611     if (segmentEntry != null) {
612       final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
613       if (!compactSegments.isEmpty()) {
614         LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
615         for (JournalSegment<E> segment : compactSegments.values()) {
616           LOG.trace("Deleting segment: {}", segment);
617           segment.close();
618           segment.delete();
619         }
620         compactSegments.clear();
621         resetHead(segmentEntry.getValue().firstIndex());
622       }
623     }
624   }
625
626   @Override
627   public void close() {
628     segments.values().forEach(segment -> {
629       LOG.debug("Closing segment: {}", segment);
630       segment.close();
631     });
632     currentSegment = null;
633     open = false;
634   }
635
636   /**
637    * Returns whether {@code flushOnCommit} is enabled for the log.
638    *
639    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
640    */
641   boolean isFlushOnCommit() {
642     return flushOnCommit;
643   }
644
645   /**
646    * Commits entries up to the given index.
647    *
648    * @param index The index up to which to commit entries.
649    */
650   void setCommitIndex(long index) {
651     this.commitIndex = index;
652   }
653
654   /**
655    * Returns the Raft log commit index.
656    *
657    * @return The Raft log commit index.
658    */
659   long getCommitIndex() {
660     return commitIndex;
661   }
662
663   /**
664    * Raft log builder.
665    */
666   public static final class Builder<E> {
667     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
668     private static final String DEFAULT_NAME = "atomix";
669     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
670     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
671     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
672     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
673     private static final double DEFAULT_INDEX_DENSITY = .005;
674
675     private String name = DEFAULT_NAME;
676     private StorageLevel storageLevel = StorageLevel.DISK;
677     private File directory = new File(DEFAULT_DIRECTORY);
678     private JournalSerdes namespace;
679     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
680     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
681     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
682     private double indexDensity = DEFAULT_INDEX_DENSITY;
683     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
684
685     protected Builder() {
686     }
687
688     /**
689      * Sets the storage name.
690      *
691      * @param name The storage name.
692      * @return The storage builder.
693      */
694     public Builder<E> withName(String name) {
695       this.name = requireNonNull(name, "name cannot be null");
696       return this;
697     }
698
699     /**
700      * Sets the log storage level, returning the builder for method chaining.
701      * <p>
702      * The storage level indicates how individual entries should be persisted in the journal.
703      *
704      * @param storageLevel The log storage level.
705      * @return The storage builder.
706      */
707     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
708       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
709       return this;
710     }
711
712     /**
713      * Sets the log directory, returning the builder for method chaining.
714      * <p>
715      * The log will write segment files into the provided directory.
716      *
717      * @param directory The log directory.
718      * @return The storage builder.
719      * @throws NullPointerException If the {@code directory} is {@code null}
720      */
721     public Builder<E> withDirectory(String directory) {
722       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
723     }
724
725     /**
726      * Sets the log directory, returning the builder for method chaining.
727      * <p>
728      * The log will write segment files into the provided directory.
729      *
730      * @param directory The log directory.
731      * @return The storage builder.
732      * @throws NullPointerException If the {@code directory} is {@code null}
733      */
734     public Builder<E> withDirectory(File directory) {
735       this.directory = requireNonNull(directory, "directory cannot be null");
736       return this;
737     }
738
739     /**
740      * Sets the journal namespace, returning the builder for method chaining.
741      *
742      * @param namespace The journal serializer.
743      * @return The journal builder.
744      */
745     public Builder<E> withNamespace(JournalSerdes namespace) {
746       this.namespace = requireNonNull(namespace, "namespace cannot be null");
747       return this;
748     }
749
750     /**
751      * Sets the maximum segment size in bytes, returning the builder for method chaining.
752      * <p>
753      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
754      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
755      * segment and append new entries to that segment.
756      * <p>
757      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
758      *
759      * @param maxSegmentSize The maximum segment size in bytes.
760      * @return The storage builder.
761      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
762      */
763     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
764       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
765       this.maxSegmentSize = maxSegmentSize;
766       return this;
767     }
768
769     /**
770      * Sets the maximum entry size in bytes, returning the builder for method chaining.
771      *
772      * @param maxEntrySize the maximum entry size in bytes
773      * @return the storage builder
774      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
775      */
776     public Builder<E> withMaxEntrySize(int maxEntrySize) {
777       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
778       this.maxEntrySize = maxEntrySize;
779       return this;
780     }
781
782     /**
783      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
784      * <p>
785      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
786      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
787      * new segment and append new entries to that segment.
788      * <p>
789      * By default, the maximum entries per segment is {@code 1024 * 1024}.
790      *
791      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
792      * @return The storage builder.
793      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
794      *     per segment
795      * @deprecated since 3.0.2
796      */
797     @Deprecated
798     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
799       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
800       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
801           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
802       this.maxEntriesPerSegment = maxEntriesPerSegment;
803       return this;
804     }
805
806     /**
807      * Sets the journal index density.
808      * <p>
809      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
810      * in-memory index for faster seeking.
811      *
812      * @param indexDensity the index density
813      * @return the journal builder
814      * @throws IllegalArgumentException if the density is not between 0 and 1
815      */
816     public Builder<E> withIndexDensity(double indexDensity) {
817       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
818       this.indexDensity = indexDensity;
819       return this;
820     }
821
822     /**
823      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
824      * chaining.
825      * <p>
826      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
827      * committed in a given segment.
828      *
829      * @return The storage builder.
830      */
831     public Builder<E> withFlushOnCommit() {
832       return withFlushOnCommit(true);
833     }
834
835     /**
836      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
837      * chaining.
838      * <p>
839      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
840      * committed in a given segment.
841      *
842      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
843      * @return The storage builder.
844      */
845     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
846       this.flushOnCommit = flushOnCommit;
847       return this;
848     }
849
850     /**
851      * Build the {@link SegmentedJournal}.
852      *
853      * @return A new {@link SegmentedJournal}.
854      */
855     public SegmentedJournal<E> build() {
856       return new SegmentedJournal<>(
857           name,
858           storageLevel,
859           directory,
860           namespace,
861           maxSegmentSize,
862           maxEntrySize,
863           maxEntriesPerSegment,
864           indexDensity,
865           flushOnCommit);
866     }
867   }
868 }