f2d99eec533a094f36ef82476d8094f7f2ddd5cb
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.SortedMap;
28 import java.util.TreeMap;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentNavigableMap;
31 import java.util.concurrent.ConcurrentSkipListMap;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import static com.google.common.base.Preconditions.checkArgument;
36 import static com.google.common.base.Preconditions.checkState;
37 import static java.util.Objects.requireNonNull;
38
39 /**
40  * Segmented journal.
41  */
42 public final class SegmentedJournal<E> implements Journal<E> {
43
44   /**
45    * Returns a new Raft log builder.
46    *
47    * @return A new Raft log builder.
48    */
49   public static <E> Builder<E> builder() {
50     return new Builder<>();
51   }
52
53   private static final int SEGMENT_BUFFER_FACTOR = 3;
54
55   private final Logger log = LoggerFactory.getLogger(getClass());
56   private final String name;
57   private final StorageLevel storageLevel;
58   private final File directory;
59   private final JournalSerdes namespace;
60   private final int maxSegmentSize;
61   private final int maxEntrySize;
62   private final int maxEntriesPerSegment;
63   private final double indexDensity;
64   private final boolean flushOnCommit;
65   private final SegmentedJournalWriter<E> writer;
66   private volatile long commitIndex;
67
68   private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
69   private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
70   private JournalSegment<E> currentSegment;
71
72   private volatile boolean open = true;
73
74   public SegmentedJournal(
75       String name,
76       StorageLevel storageLevel,
77       File directory,
78       JournalSerdes namespace,
79       int maxSegmentSize,
80       int maxEntrySize,
81       int maxEntriesPerSegment,
82       double indexDensity,
83       boolean flushOnCommit) {
84     this.name = requireNonNull(name, "name cannot be null");
85     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
86     this.directory = requireNonNull(directory, "directory cannot be null");
87     this.namespace = requireNonNull(namespace, "namespace cannot be null");
88     this.maxSegmentSize = maxSegmentSize;
89     this.maxEntrySize = maxEntrySize;
90     this.maxEntriesPerSegment = maxEntriesPerSegment;
91     this.indexDensity = indexDensity;
92     this.flushOnCommit = flushOnCommit;
93     open();
94     this.writer = openWriter();
95   }
96
97   /**
98    * Returns the segment file name prefix.
99    *
100    * @return The segment file name prefix.
101    */
102   public String name() {
103     return name;
104   }
105
106   /**
107    * Returns the storage directory.
108    * <p>
109    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
110    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
111    * when the log is opened.
112    *
113    * @return The storage directory.
114    */
115   public File directory() {
116     return directory;
117   }
118
119   /**
120    * Returns the storage level.
121    * <p>
122    * The storage level dictates how entries within individual journal segments should be stored.
123    *
124    * @return The storage level.
125    */
126   public StorageLevel storageLevel() {
127     return storageLevel;
128   }
129
130   /**
131    * Returns the maximum journal segment size.
132    * <p>
133    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
134    *
135    * @return The maximum segment size in bytes.
136    */
137   public int maxSegmentSize() {
138     return maxSegmentSize;
139   }
140
141   /**
142    * Returns the maximum journal entry size.
143    * <p>
144    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
145    *
146    * @return the maximum entry size in bytes
147    */
148   public int maxEntrySize() {
149     return maxEntrySize;
150   }
151
152   /**
153    * Returns the maximum number of entries per segment.
154    * <p>
155    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
156    * in a journal.
157    *
158    * @return The maximum number of entries per segment.
159    * @deprecated since 3.0.2
160    */
161   @Deprecated
162   public int maxEntriesPerSegment() {
163     return maxEntriesPerSegment;
164   }
165
166   /**
167    * Returns the collection of journal segments.
168    *
169    * @return the collection of journal segments
170    */
171   public Collection<JournalSegment<E>> segments() {
172     return segments.values();
173   }
174
175   /**
176    * Returns the collection of journal segments with indexes greater than the given index.
177    *
178    * @param index the starting index
179    * @return the journal segments starting with indexes greater than or equal to the given index
180    */
181   public Collection<JournalSegment<E>> segments(long index) {
182     return segments.tailMap(index).values();
183   }
184
185   /**
186    * Returns the total size of the journal.
187    *
188    * @return the total size of the journal
189    */
190   public long size() {
191     return segments.values().stream()
192         .mapToLong(segment -> segment.size())
193         .sum();
194   }
195
196   @Override
197   public SegmentedJournalWriter<E> writer() {
198     return writer;
199   }
200
201   @Override
202   public SegmentedJournalReader<E> openReader(long index) {
203     return openReader(index, SegmentedJournalReader.Mode.ALL);
204   }
205
206   /**
207    * Opens a new Raft log reader with the given reader mode.
208    *
209    * @param index The index from which to begin reading entries.
210    * @param mode The mode in which to read entries.
211    * @return The Raft log reader.
212    */
213   public SegmentedJournalReader<E> openReader(long index, SegmentedJournalReader.Mode mode) {
214     SegmentedJournalReader<E> reader = new SegmentedJournalReader<>(this, index, mode);
215     readers.add(reader);
216     return reader;
217   }
218
219   /**
220    * Opens a new journal writer.
221    *
222    * @return A new journal writer.
223    */
224   protected SegmentedJournalWriter<E> openWriter() {
225     return new SegmentedJournalWriter<>(this);
226   }
227
228   /**
229    * Opens the segments.
230    */
231   private synchronized void open() {
232     // Load existing log segments from disk.
233     for (JournalSegment<E> segment : loadSegments()) {
234       segments.put(segment.descriptor().index(), segment);
235     }
236
237     // If a segment doesn't already exist, create an initial segment starting at index 1.
238     if (!segments.isEmpty()) {
239       currentSegment = segments.lastEntry().getValue();
240     } else {
241       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
242           .withId(1)
243           .withIndex(1)
244           .withMaxSegmentSize(maxSegmentSize)
245           .withMaxEntries(maxEntriesPerSegment)
246           .build();
247
248       currentSegment = createSegment(descriptor);
249       currentSegment.descriptor().update(System.currentTimeMillis());
250
251       segments.put(1L, currentSegment);
252     }
253   }
254
255   /**
256    * Asserts that the manager is open.
257    *
258    * @throws IllegalStateException if the segment manager is not open
259    */
260   private void assertOpen() {
261     checkState(currentSegment != null, "journal not open");
262   }
263
264   /**
265    * Asserts that enough disk space is available to allocate a new segment.
266    */
267   private void assertDiskSpace() {
268     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
269       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
270     }
271   }
272
273   /**
274    * Resets the current segment, creating a new segment if necessary.
275    */
276   private synchronized void resetCurrentSegment() {
277     JournalSegment<E> lastSegment = getLastSegment();
278     if (lastSegment != null) {
279       currentSegment = lastSegment;
280     } else {
281       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
282           .withId(1)
283           .withIndex(1)
284           .withMaxSegmentSize(maxSegmentSize)
285           .withMaxEntries(maxEntriesPerSegment)
286           .build();
287
288       currentSegment = createSegment(descriptor);
289
290       segments.put(1L, currentSegment);
291     }
292   }
293
294   /**
295    * Resets and returns the first segment in the journal.
296    *
297    * @param index the starting index of the journal
298    * @return the first segment
299    */
300   JournalSegment<E> resetSegments(long index) {
301     assertOpen();
302
303     // If the index already equals the first segment index, skip the reset.
304     JournalSegment<E> firstSegment = getFirstSegment();
305     if (index == firstSegment.index()) {
306       return firstSegment;
307     }
308
309     for (JournalSegment<E> segment : segments.values()) {
310       segment.close();
311       segment.delete();
312     }
313     segments.clear();
314
315     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
316         .withId(1)
317         .withIndex(index)
318         .withMaxSegmentSize(maxSegmentSize)
319         .withMaxEntries(maxEntriesPerSegment)
320         .build();
321     currentSegment = createSegment(descriptor);
322     segments.put(index, currentSegment);
323     return currentSegment;
324   }
325
326   /**
327    * Returns the first segment in the log.
328    *
329    * @throws IllegalStateException if the segment manager is not open
330    */
331   JournalSegment<E> getFirstSegment() {
332     assertOpen();
333     Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
334     return segment != null ? segment.getValue() : null;
335   }
336
337   /**
338    * Returns the last segment in the log.
339    *
340    * @throws IllegalStateException if the segment manager is not open
341    */
342   JournalSegment<E> getLastSegment() {
343     assertOpen();
344     Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
345     return segment != null ? segment.getValue() : null;
346   }
347
348   /**
349    * Creates and returns the next segment.
350    *
351    * @return The next segment.
352    * @throws IllegalStateException if the segment manager is not open
353    */
354   synchronized JournalSegment<E> getNextSegment() {
355     assertOpen();
356     assertDiskSpace();
357
358     JournalSegment<E> lastSegment = getLastSegment();
359     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
360         .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
361         .withIndex(currentSegment.lastIndex() + 1)
362         .withMaxSegmentSize(maxSegmentSize)
363         .withMaxEntries(maxEntriesPerSegment)
364         .build();
365
366     currentSegment = createSegment(descriptor);
367
368     segments.put(descriptor.index(), currentSegment);
369     return currentSegment;
370   }
371
372   /**
373    * Returns the segment following the segment with the given ID.
374    *
375    * @param index The segment index with which to look up the next segment.
376    * @return The next segment for the given index.
377    */
378   JournalSegment<E> getNextSegment(long index) {
379     Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
380     return nextSegment != null ? nextSegment.getValue() : null;
381   }
382
383   /**
384    * Returns the segment for the given index.
385    *
386    * @param index The index for which to return the segment.
387    * @throws IllegalStateException if the segment manager is not open
388    */
389   synchronized JournalSegment<E> getSegment(long index) {
390     assertOpen();
391     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
392     if (currentSegment != null && index > currentSegment.index()) {
393       return currentSegment;
394     }
395
396     // If the index is in another segment, get the entry with the next lowest first index.
397     Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
398     if (segment != null) {
399       return segment.getValue();
400     }
401     return getFirstSegment();
402   }
403
404   /**
405    * Removes a segment.
406    *
407    * @param segment The segment to remove.
408    */
409   synchronized void removeSegment(JournalSegment<E> segment) {
410     segments.remove(segment.index());
411     segment.close();
412     segment.delete();
413     resetCurrentSegment();
414   }
415
416   /**
417    * Creates a new segment.
418    */
419   JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
420     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
421
422     RandomAccessFile raf;
423     FileChannel channel;
424     try {
425       raf = new RandomAccessFile(segmentFile, "rw");
426       raf.setLength(descriptor.maxSegmentSize());
427       channel =  raf.getChannel();
428     } catch (IOException e) {
429       throw new StorageException(e);
430     }
431
432     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
433     descriptor.copyTo(buffer);
434     buffer.flip();
435     try {
436       channel.write(buffer);
437     } catch (IOException e) {
438       throw new StorageException(e);
439     } finally {
440       try {
441         channel.close();
442         raf.close();
443       } catch (IOException e) {
444       }
445     }
446     JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
447     log.debug("Created segment: {}", segment);
448     return segment;
449   }
450
451   /**
452    * Creates a new segment instance.
453    *
454    * @param segmentFile The segment file.
455    * @param descriptor The segment descriptor.
456    * @return The segment instance.
457    */
458   protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
459     return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
460   }
461
462   /**
463    * Loads a segment.
464    */
465   private JournalSegment<E> loadSegment(long segmentId) {
466     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
467     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
468     try (FileChannel channel = openChannel(segmentFile)) {
469       channel.read(buffer);
470       buffer.flip();
471       JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
472       JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
473       log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
474       return segment;
475     } catch (IOException e) {
476       throw new StorageException(e);
477     }
478   }
479
480   private FileChannel openChannel(File file) {
481     try {
482       return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
483     } catch (IOException e) {
484       throw new StorageException(e);
485     }
486   }
487
488   /**
489    * Loads all segments from disk.
490    *
491    * @return A collection of segments for the log.
492    */
493   protected Collection<JournalSegment<E>> loadSegments() {
494     // Ensure log directories are created.
495     directory.mkdirs();
496
497     TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
498
499     // Iterate through all files in the log directory.
500     for (File file : directory.listFiles(File::isFile)) {
501
502       // If the file looks like a segment file, attempt to load the segment.
503       if (JournalSegmentFile.isSegmentFile(name, file)) {
504         JournalSegmentFile segmentFile = new JournalSegmentFile(file);
505         ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
506         try (FileChannel channel = openChannel(file)) {
507           channel.read(buffer);
508           buffer.flip();
509         } catch (IOException e) {
510           throw new StorageException(e);
511         }
512
513         JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
514
515         // Load the segment.
516         JournalSegment<E> segment = loadSegment(descriptor.id());
517
518         // Add the segment to the segments list.
519         log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
520         segments.put(segment.index(), segment);
521       }
522     }
523
524     // Verify that all the segments in the log align with one another.
525     JournalSegment<E> previousSegment = null;
526     boolean corrupted = false;
527     Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
528     while (iterator.hasNext()) {
529       JournalSegment<E> segment = iterator.next().getValue();
530       if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
531         log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
532         corrupted = true;
533       }
534       if (corrupted) {
535         segment.close();
536         segment.delete();
537         iterator.remove();
538       }
539       previousSegment = segment;
540     }
541
542     return segments.values();
543   }
544
545   /**
546    * Resets journal readers to the given head.
547    *
548    * @param index The index at which to reset readers.
549    */
550   void resetHead(long index) {
551     for (SegmentedJournalReader<E> reader : readers) {
552       if (reader.getNextIndex() < index) {
553         reader.reset(index);
554       }
555     }
556   }
557
558   /**
559    * Resets journal readers to the given tail.
560    *
561    * @param index The index at which to reset readers.
562    */
563   void resetTail(long index) {
564     for (SegmentedJournalReader<E> reader : readers) {
565       if (reader.getNextIndex() >= index) {
566         reader.reset(index);
567       }
568     }
569   }
570
571   void closeReader(SegmentedJournalReader<E> reader) {
572     readers.remove(reader);
573   }
574
575   @Override
576   public boolean isOpen() {
577     return open;
578   }
579
580   /**
581    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
582    *
583    * @param index the index from which to remove segments
584    * @return indicates whether a segment can be removed from the journal
585    */
586   public boolean isCompactable(long index) {
587     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
588     return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
589   }
590
591   /**
592    * Returns the index of the last segment in the log.
593    *
594    * @param index the compaction index
595    * @return the starting index of the last segment in the log
596    */
597   public long getCompactableIndex(long index) {
598     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
599     return segmentEntry != null ? segmentEntry.getValue().index() : 0;
600   }
601
602   /**
603    * Compacts the journal up to the given index.
604    * <p>
605    * The semantics of compaction are not specified by this interface.
606    *
607    * @param index The index up to which to compact the journal.
608    */
609   public void compact(long index) {
610     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
611     if (segmentEntry != null) {
612       SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
613       if (!compactSegments.isEmpty()) {
614         log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
615         for (JournalSegment<E> segment : compactSegments.values()) {
616           log.trace("Deleting segment: {}", segment);
617           segment.close();
618           segment.delete();
619         }
620         compactSegments.clear();
621         resetHead(segmentEntry.getValue().index());
622       }
623     }
624   }
625
626   @Override
627   public void close() {
628     segments.values().forEach(segment -> {
629       log.debug("Closing segment: {}", segment);
630       segment.close();
631     });
632     currentSegment = null;
633     open = false;
634   }
635
636   /**
637    * Returns whether {@code flushOnCommit} is enabled for the log.
638    *
639    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
640    */
641   boolean isFlushOnCommit() {
642     return flushOnCommit;
643   }
644
645   /**
646    * Commits entries up to the given index.
647    *
648    * @param index The index up to which to commit entries.
649    */
650   void setCommitIndex(long index) {
651     this.commitIndex = index;
652   }
653
654   /**
655    * Returns the Raft log commit index.
656    *
657    * @return The Raft log commit index.
658    */
659   long getCommitIndex() {
660     return commitIndex;
661   }
662
663   /**
664    * Raft log builder.
665    */
666   public static final class Builder<E> {
667     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
668     private static final String DEFAULT_NAME = "atomix";
669     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
670     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
671     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
672     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
673     private static final double DEFAULT_INDEX_DENSITY = .005;
674
675     private String name = DEFAULT_NAME;
676     private StorageLevel storageLevel = StorageLevel.DISK;
677     private File directory = new File(DEFAULT_DIRECTORY);
678     private JournalSerdes namespace;
679     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
680     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
681     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
682     private double indexDensity = DEFAULT_INDEX_DENSITY;
683     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
684
685     protected Builder() {
686     }
687
688     /**
689      * Sets the storage name.
690      *
691      * @param name The storage name.
692      * @return The storage builder.
693      */
694     public Builder<E> withName(String name) {
695       this.name = requireNonNull(name, "name cannot be null");
696       return this;
697     }
698
699     /**
700      * Sets the log storage level, returning the builder for method chaining.
701      * <p>
702      * The storage level indicates how individual entries should be persisted in the journal.
703      *
704      * @param storageLevel The log storage level.
705      * @return The storage builder.
706      */
707     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
708       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
709       return this;
710     }
711
712     /**
713      * Sets the log directory, returning the builder for method chaining.
714      * <p>
715      * The log will write segment files into the provided directory.
716      *
717      * @param directory The log directory.
718      * @return The storage builder.
719      * @throws NullPointerException If the {@code directory} is {@code null}
720      */
721     public Builder<E> withDirectory(String directory) {
722       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
723     }
724
725     /**
726      * Sets the log directory, returning the builder for method chaining.
727      * <p>
728      * The log will write segment files into the provided directory.
729      *
730      * @param directory The log directory.
731      * @return The storage builder.
732      * @throws NullPointerException If the {@code directory} is {@code null}
733      */
734     public Builder<E> withDirectory(File directory) {
735       this.directory = requireNonNull(directory, "directory cannot be null");
736       return this;
737     }
738
739     /**
740      * Sets the journal namespace, returning the builder for method chaining.
741      *
742      * @param namespace The journal serializer.
743      * @return The journal builder.
744      */
745     public Builder<E> withNamespace(JournalSerdes namespace) {
746       this.namespace = requireNonNull(namespace, "namespace cannot be null");
747       return this;
748     }
749
750     /**
751      * Sets the maximum segment size in bytes, returning the builder for method chaining.
752      * <p>
753      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
754      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
755      * segment and append new entries to that segment.
756      * <p>
757      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
758      *
759      * @param maxSegmentSize The maximum segment size in bytes.
760      * @return The storage builder.
761      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
762      */
763     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
764       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
765       this.maxSegmentSize = maxSegmentSize;
766       return this;
767     }
768
769     /**
770      * Sets the maximum entry size in bytes, returning the builder for method chaining.
771      *
772      * @param maxEntrySize the maximum entry size in bytes
773      * @return the storage builder
774      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
775      */
776     public Builder<E> withMaxEntrySize(int maxEntrySize) {
777       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
778       this.maxEntrySize = maxEntrySize;
779       return this;
780     }
781
782     /**
783      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
784      * <p>
785      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
786      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
787      * new segment and append new entries to that segment.
788      * <p>
789      * By default, the maximum entries per segment is {@code 1024 * 1024}.
790      *
791      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
792      * @return The storage builder.
793      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
794      *     per segment
795      * @deprecated since 3.0.2
796      */
797     @Deprecated
798     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
799       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
800       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
801           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
802       this.maxEntriesPerSegment = maxEntriesPerSegment;
803       return this;
804     }
805
806     /**
807      * Sets the journal index density.
808      * <p>
809      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
810      * in-memory index for faster seeking.
811      *
812      * @param indexDensity the index density
813      * @return the journal builder
814      * @throws IllegalArgumentException if the density is not between 0 and 1
815      */
816     public Builder<E> withIndexDensity(double indexDensity) {
817       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
818       this.indexDensity = indexDensity;
819       return this;
820     }
821
822     /**
823      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
824      * chaining.
825      * <p>
826      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
827      * committed in a given segment.
828      *
829      * @return The storage builder.
830      */
831     public Builder<E> withFlushOnCommit() {
832       return withFlushOnCommit(true);
833     }
834
835     /**
836      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
837      * chaining.
838      * <p>
839      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
840      * committed in a given segment.
841      *
842      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
843      * @return The storage builder.
844      */
845     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
846       this.flushOnCommit = flushOnCommit;
847       return this;
848     }
849
850     /**
851      * Build the {@link SegmentedJournal}.
852      *
853      * @return A new {@link SegmentedJournal}.
854      */
855     public SegmentedJournal<E> build() {
856       return new SegmentedJournal<>(
857           name,
858           storageLevel,
859           directory,
860           namespace,
861           maxSegmentSize,
862           maxEntrySize,
863           maxEntriesPerSegment,
864           indexDensity,
865           flushOnCommit);
866     }
867   }
868 }