Hide SegmentedJournal{Reader,Writer}
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.SortedMap;
28 import java.util.TreeMap;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentNavigableMap;
31 import java.util.concurrent.ConcurrentSkipListMap;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import static com.google.common.base.Preconditions.checkArgument;
36 import static com.google.common.base.Preconditions.checkState;
37 import static java.util.Objects.requireNonNull;
38
39 /**
40  * Segmented journal.
41  */
42 public final class SegmentedJournal<E> implements Journal<E> {
43   /**
44    * Returns a new Raft log builder.
45    *
46    * @return A new Raft log builder.
47    */
48   public static <E> Builder<E> builder() {
49     return new Builder<>();
50   }
51
52   private static final int SEGMENT_BUFFER_FACTOR = 3;
53
54   private final Logger log = LoggerFactory.getLogger(getClass());
55   private final String name;
56   private final StorageLevel storageLevel;
57   private final File directory;
58   private final JournalSerdes namespace;
59   private final int maxSegmentSize;
60   private final int maxEntrySize;
61   private final int maxEntriesPerSegment;
62   private final double indexDensity;
63   private final boolean flushOnCommit;
64   private final SegmentedJournalWriter<E> writer;
65   private volatile long commitIndex;
66
67   private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
68   private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
69   private JournalSegment<E> currentSegment;
70
71   private volatile boolean open = true;
72
73   public SegmentedJournal(
74       String name,
75       StorageLevel storageLevel,
76       File directory,
77       JournalSerdes namespace,
78       int maxSegmentSize,
79       int maxEntrySize,
80       int maxEntriesPerSegment,
81       double indexDensity,
82       boolean flushOnCommit) {
83     this.name = requireNonNull(name, "name cannot be null");
84     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
85     this.directory = requireNonNull(directory, "directory cannot be null");
86     this.namespace = requireNonNull(namespace, "namespace cannot be null");
87     this.maxSegmentSize = maxSegmentSize;
88     this.maxEntrySize = maxEntrySize;
89     this.maxEntriesPerSegment = maxEntriesPerSegment;
90     this.indexDensity = indexDensity;
91     this.flushOnCommit = flushOnCommit;
92     open();
93     this.writer = new SegmentedJournalWriter<>(this);
94   }
95
96   /**
97    * Returns the segment file name prefix.
98    *
99    * @return The segment file name prefix.
100    */
101   public String name() {
102     return name;
103   }
104
105   /**
106    * Returns the storage directory.
107    * <p>
108    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
109    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
110    * when the log is opened.
111    *
112    * @return The storage directory.
113    */
114   public File directory() {
115     return directory;
116   }
117
118   /**
119    * Returns the storage level.
120    * <p>
121    * The storage level dictates how entries within individual journal segments should be stored.
122    *
123    * @return The storage level.
124    */
125   public StorageLevel storageLevel() {
126     return storageLevel;
127   }
128
129   /**
130    * Returns the maximum journal segment size.
131    * <p>
132    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
133    *
134    * @return The maximum segment size in bytes.
135    */
136   public int maxSegmentSize() {
137     return maxSegmentSize;
138   }
139
140   /**
141    * Returns the maximum journal entry size.
142    * <p>
143    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
144    *
145    * @return the maximum entry size in bytes
146    */
147   public int maxEntrySize() {
148     return maxEntrySize;
149   }
150
151   /**
152    * Returns the maximum number of entries per segment.
153    * <p>
154    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
155    * in a journal.
156    *
157    * @return The maximum number of entries per segment.
158    * @deprecated since 3.0.2
159    */
160   @Deprecated
161   public int maxEntriesPerSegment() {
162     return maxEntriesPerSegment;
163   }
164
165   /**
166    * Returns the collection of journal segments.
167    *
168    * @return the collection of journal segments
169    */
170   public Collection<JournalSegment<E>> segments() {
171     return segments.values();
172   }
173
174   /**
175    * Returns the collection of journal segments with indexes greater than the given index.
176    *
177    * @param index the starting index
178    * @return the journal segments starting with indexes greater than or equal to the given index
179    */
180   public Collection<JournalSegment<E>> segments(long index) {
181     return segments.tailMap(index).values();
182   }
183
184   /**
185    * Returns the total size of the journal.
186    *
187    * @return the total size of the journal
188    */
189   public long size() {
190     return segments.values().stream()
191         .mapToLong(segment -> segment.size())
192         .sum();
193   }
194
195   @Override
196   public JournalWriter<E> writer() {
197     return writer;
198   }
199
200   @Override
201   public JournalReader<E> openReader(long index) {
202     return openReader(index, JournalReader.Mode.ALL);
203   }
204
205   /**
206    * Opens a new Raft log reader with the given reader mode.
207    *
208    * @param index The index from which to begin reading entries.
209    * @param mode The mode in which to read entries.
210    * @return The Raft log reader.
211    */
212   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
213     final var reader = new SegmentedJournalReader<>(this, index, mode);
214     readers.add(reader);
215     return reader;
216   }
217
218   /**
219    * Opens the segments.
220    */
221   private synchronized void open() {
222     // Load existing log segments from disk.
223     for (JournalSegment<E> segment : loadSegments()) {
224       segments.put(segment.descriptor().index(), segment);
225     }
226
227     // If a segment doesn't already exist, create an initial segment starting at index 1.
228     if (!segments.isEmpty()) {
229       currentSegment = segments.lastEntry().getValue();
230     } else {
231       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
232           .withId(1)
233           .withIndex(1)
234           .withMaxSegmentSize(maxSegmentSize)
235           .withMaxEntries(maxEntriesPerSegment)
236           .build();
237
238       currentSegment = createSegment(descriptor);
239       currentSegment.descriptor().update(System.currentTimeMillis());
240
241       segments.put(1L, currentSegment);
242     }
243   }
244
245   /**
246    * Asserts that the manager is open.
247    *
248    * @throws IllegalStateException if the segment manager is not open
249    */
250   private void assertOpen() {
251     checkState(currentSegment != null, "journal not open");
252   }
253
254   /**
255    * Asserts that enough disk space is available to allocate a new segment.
256    */
257   private void assertDiskSpace() {
258     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
259       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
260     }
261   }
262
263   /**
264    * Resets the current segment, creating a new segment if necessary.
265    */
266   private synchronized void resetCurrentSegment() {
267     JournalSegment<E> lastSegment = getLastSegment();
268     if (lastSegment != null) {
269       currentSegment = lastSegment;
270     } else {
271       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
272           .withId(1)
273           .withIndex(1)
274           .withMaxSegmentSize(maxSegmentSize)
275           .withMaxEntries(maxEntriesPerSegment)
276           .build();
277
278       currentSegment = createSegment(descriptor);
279
280       segments.put(1L, currentSegment);
281     }
282   }
283
284   /**
285    * Resets and returns the first segment in the journal.
286    *
287    * @param index the starting index of the journal
288    * @return the first segment
289    */
290   JournalSegment<E> resetSegments(long index) {
291     assertOpen();
292
293     // If the index already equals the first segment index, skip the reset.
294     JournalSegment<E> firstSegment = getFirstSegment();
295     if (index == firstSegment.index()) {
296       return firstSegment;
297     }
298
299     for (JournalSegment<E> segment : segments.values()) {
300       segment.close();
301       segment.delete();
302     }
303     segments.clear();
304
305     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
306         .withId(1)
307         .withIndex(index)
308         .withMaxSegmentSize(maxSegmentSize)
309         .withMaxEntries(maxEntriesPerSegment)
310         .build();
311     currentSegment = createSegment(descriptor);
312     segments.put(index, currentSegment);
313     return currentSegment;
314   }
315
316   /**
317    * Returns the first segment in the log.
318    *
319    * @throws IllegalStateException if the segment manager is not open
320    */
321   JournalSegment<E> getFirstSegment() {
322     assertOpen();
323     Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
324     return segment != null ? segment.getValue() : null;
325   }
326
327   /**
328    * Returns the last segment in the log.
329    *
330    * @throws IllegalStateException if the segment manager is not open
331    */
332   JournalSegment<E> getLastSegment() {
333     assertOpen();
334     Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
335     return segment != null ? segment.getValue() : null;
336   }
337
338   /**
339    * Creates and returns the next segment.
340    *
341    * @return The next segment.
342    * @throws IllegalStateException if the segment manager is not open
343    */
344   synchronized JournalSegment<E> getNextSegment() {
345     assertOpen();
346     assertDiskSpace();
347
348     JournalSegment<E> lastSegment = getLastSegment();
349     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
350         .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
351         .withIndex(currentSegment.lastIndex() + 1)
352         .withMaxSegmentSize(maxSegmentSize)
353         .withMaxEntries(maxEntriesPerSegment)
354         .build();
355
356     currentSegment = createSegment(descriptor);
357
358     segments.put(descriptor.index(), currentSegment);
359     return currentSegment;
360   }
361
362   /**
363    * Returns the segment following the segment with the given ID.
364    *
365    * @param index The segment index with which to look up the next segment.
366    * @return The next segment for the given index.
367    */
368   JournalSegment<E> getNextSegment(long index) {
369     Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
370     return nextSegment != null ? nextSegment.getValue() : null;
371   }
372
373   /**
374    * Returns the segment for the given index.
375    *
376    * @param index The index for which to return the segment.
377    * @throws IllegalStateException if the segment manager is not open
378    */
379   synchronized JournalSegment<E> getSegment(long index) {
380     assertOpen();
381     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
382     if (currentSegment != null && index > currentSegment.index()) {
383       return currentSegment;
384     }
385
386     // If the index is in another segment, get the entry with the next lowest first index.
387     Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
388     if (segment != null) {
389       return segment.getValue();
390     }
391     return getFirstSegment();
392   }
393
394   /**
395    * Removes a segment.
396    *
397    * @param segment The segment to remove.
398    */
399   synchronized void removeSegment(JournalSegment<E> segment) {
400     segments.remove(segment.index());
401     segment.close();
402     segment.delete();
403     resetCurrentSegment();
404   }
405
406   /**
407    * Creates a new segment.
408    */
409   JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
410     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
411
412     RandomAccessFile raf;
413     FileChannel channel;
414     try {
415       raf = new RandomAccessFile(segmentFile, "rw");
416       raf.setLength(descriptor.maxSegmentSize());
417       channel =  raf.getChannel();
418     } catch (IOException e) {
419       throw new StorageException(e);
420     }
421
422     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
423     descriptor.copyTo(buffer);
424     buffer.flip();
425     try {
426       channel.write(buffer);
427     } catch (IOException e) {
428       throw new StorageException(e);
429     } finally {
430       try {
431         channel.close();
432         raf.close();
433       } catch (IOException e) {
434       }
435     }
436     JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
437     log.debug("Created segment: {}", segment);
438     return segment;
439   }
440
441   /**
442    * Creates a new segment instance.
443    *
444    * @param segmentFile The segment file.
445    * @param descriptor The segment descriptor.
446    * @return The segment instance.
447    */
448   protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
449     return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
450   }
451
452   /**
453    * Loads a segment.
454    */
455   private JournalSegment<E> loadSegment(long segmentId) {
456     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
457     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
458     try (FileChannel channel = openChannel(segmentFile)) {
459       channel.read(buffer);
460       buffer.flip();
461       JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
462       JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
463       log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
464       return segment;
465     } catch (IOException e) {
466       throw new StorageException(e);
467     }
468   }
469
470   private FileChannel openChannel(File file) {
471     try {
472       return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
473     } catch (IOException e) {
474       throw new StorageException(e);
475     }
476   }
477
478   /**
479    * Loads all segments from disk.
480    *
481    * @return A collection of segments for the log.
482    */
483   protected Collection<JournalSegment<E>> loadSegments() {
484     // Ensure log directories are created.
485     directory.mkdirs();
486
487     TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
488
489     // Iterate through all files in the log directory.
490     for (File file : directory.listFiles(File::isFile)) {
491
492       // If the file looks like a segment file, attempt to load the segment.
493       if (JournalSegmentFile.isSegmentFile(name, file)) {
494         JournalSegmentFile segmentFile = new JournalSegmentFile(file);
495         ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
496         try (FileChannel channel = openChannel(file)) {
497           channel.read(buffer);
498           buffer.flip();
499         } catch (IOException e) {
500           throw new StorageException(e);
501         }
502
503         JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
504
505         // Load the segment.
506         JournalSegment<E> segment = loadSegment(descriptor.id());
507
508         // Add the segment to the segments list.
509         log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
510         segments.put(segment.index(), segment);
511       }
512     }
513
514     // Verify that all the segments in the log align with one another.
515     JournalSegment<E> previousSegment = null;
516     boolean corrupted = false;
517     Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
518     while (iterator.hasNext()) {
519       JournalSegment<E> segment = iterator.next().getValue();
520       if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
521         log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
522         corrupted = true;
523       }
524       if (corrupted) {
525         segment.close();
526         segment.delete();
527         iterator.remove();
528       }
529       previousSegment = segment;
530     }
531
532     return segments.values();
533   }
534
535   /**
536    * Resets journal readers to the given head.
537    *
538    * @param index The index at which to reset readers.
539    */
540   void resetHead(long index) {
541     for (SegmentedJournalReader<E> reader : readers) {
542       if (reader.getNextIndex() < index) {
543         reader.reset(index);
544       }
545     }
546   }
547
548   /**
549    * Resets journal readers to the given tail.
550    *
551    * @param index The index at which to reset readers.
552    */
553   void resetTail(long index) {
554     for (SegmentedJournalReader<E> reader : readers) {
555       if (reader.getNextIndex() >= index) {
556         reader.reset(index);
557       }
558     }
559   }
560
561   void closeReader(SegmentedJournalReader<E> reader) {
562     readers.remove(reader);
563   }
564
565   @Override
566   public boolean isOpen() {
567     return open;
568   }
569
570   /**
571    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
572    *
573    * @param index the index from which to remove segments
574    * @return indicates whether a segment can be removed from the journal
575    */
576   public boolean isCompactable(long index) {
577     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
578     return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
579   }
580
581   /**
582    * Returns the index of the last segment in the log.
583    *
584    * @param index the compaction index
585    * @return the starting index of the last segment in the log
586    */
587   public long getCompactableIndex(long index) {
588     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
589     return segmentEntry != null ? segmentEntry.getValue().index() : 0;
590   }
591
592   /**
593    * Compacts the journal up to the given index.
594    * <p>
595    * The semantics of compaction are not specified by this interface.
596    *
597    * @param index The index up to which to compact the journal.
598    */
599   public void compact(long index) {
600     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
601     if (segmentEntry != null) {
602       SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
603       if (!compactSegments.isEmpty()) {
604         log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
605         for (JournalSegment<E> segment : compactSegments.values()) {
606           log.trace("Deleting segment: {}", segment);
607           segment.close();
608           segment.delete();
609         }
610         compactSegments.clear();
611         resetHead(segmentEntry.getValue().index());
612       }
613     }
614   }
615
616   @Override
617   public void close() {
618     segments.values().forEach(segment -> {
619       log.debug("Closing segment: {}", segment);
620       segment.close();
621     });
622     currentSegment = null;
623     open = false;
624   }
625
626   /**
627    * Returns whether {@code flushOnCommit} is enabled for the log.
628    *
629    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
630    */
631   boolean isFlushOnCommit() {
632     return flushOnCommit;
633   }
634
635   /**
636    * Commits entries up to the given index.
637    *
638    * @param index The index up to which to commit entries.
639    */
640   void setCommitIndex(long index) {
641     this.commitIndex = index;
642   }
643
644   /**
645    * Returns the Raft log commit index.
646    *
647    * @return The Raft log commit index.
648    */
649   long getCommitIndex() {
650     return commitIndex;
651   }
652
653   /**
654    * Raft log builder.
655    */
656   public static final class Builder<E> {
657     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
658     private static final String DEFAULT_NAME = "atomix";
659     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
660     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
661     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
662     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
663     private static final double DEFAULT_INDEX_DENSITY = .005;
664
665     private String name = DEFAULT_NAME;
666     private StorageLevel storageLevel = StorageLevel.DISK;
667     private File directory = new File(DEFAULT_DIRECTORY);
668     private JournalSerdes namespace;
669     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
670     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
671     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
672     private double indexDensity = DEFAULT_INDEX_DENSITY;
673     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
674
675     protected Builder() {
676     }
677
678     /**
679      * Sets the storage name.
680      *
681      * @param name The storage name.
682      * @return The storage builder.
683      */
684     public Builder<E> withName(String name) {
685       this.name = requireNonNull(name, "name cannot be null");
686       return this;
687     }
688
689     /**
690      * Sets the log storage level, returning the builder for method chaining.
691      * <p>
692      * The storage level indicates how individual entries should be persisted in the journal.
693      *
694      * @param storageLevel The log storage level.
695      * @return The storage builder.
696      */
697     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
698       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
699       return this;
700     }
701
702     /**
703      * Sets the log directory, returning the builder for method chaining.
704      * <p>
705      * The log will write segment files into the provided directory.
706      *
707      * @param directory The log directory.
708      * @return The storage builder.
709      * @throws NullPointerException If the {@code directory} is {@code null}
710      */
711     public Builder<E> withDirectory(String directory) {
712       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
713     }
714
715     /**
716      * Sets the log directory, returning the builder for method chaining.
717      * <p>
718      * The log will write segment files into the provided directory.
719      *
720      * @param directory The log directory.
721      * @return The storage builder.
722      * @throws NullPointerException If the {@code directory} is {@code null}
723      */
724     public Builder<E> withDirectory(File directory) {
725       this.directory = requireNonNull(directory, "directory cannot be null");
726       return this;
727     }
728
729     /**
730      * Sets the journal namespace, returning the builder for method chaining.
731      *
732      * @param namespace The journal serializer.
733      * @return The journal builder.
734      */
735     public Builder<E> withNamespace(JournalSerdes namespace) {
736       this.namespace = requireNonNull(namespace, "namespace cannot be null");
737       return this;
738     }
739
740     /**
741      * Sets the maximum segment size in bytes, returning the builder for method chaining.
742      * <p>
743      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
744      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
745      * segment and append new entries to that segment.
746      * <p>
747      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
748      *
749      * @param maxSegmentSize The maximum segment size in bytes.
750      * @return The storage builder.
751      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
752      */
753     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
754       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
755       this.maxSegmentSize = maxSegmentSize;
756       return this;
757     }
758
759     /**
760      * Sets the maximum entry size in bytes, returning the builder for method chaining.
761      *
762      * @param maxEntrySize the maximum entry size in bytes
763      * @return the storage builder
764      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
765      */
766     public Builder<E> withMaxEntrySize(int maxEntrySize) {
767       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
768       this.maxEntrySize = maxEntrySize;
769       return this;
770     }
771
772     /**
773      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
774      * <p>
775      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
776      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
777      * new segment and append new entries to that segment.
778      * <p>
779      * By default, the maximum entries per segment is {@code 1024 * 1024}.
780      *
781      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
782      * @return The storage builder.
783      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
784      *     per segment
785      * @deprecated since 3.0.2
786      */
787     @Deprecated
788     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
789       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
790       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
791           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
792       this.maxEntriesPerSegment = maxEntriesPerSegment;
793       return this;
794     }
795
796     /**
797      * Sets the journal index density.
798      * <p>
799      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
800      * in-memory index for faster seeking.
801      *
802      * @param indexDensity the index density
803      * @return the journal builder
804      * @throws IllegalArgumentException if the density is not between 0 and 1
805      */
806     public Builder<E> withIndexDensity(double indexDensity) {
807       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
808       this.indexDensity = indexDensity;
809       return this;
810     }
811
812     /**
813      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
814      * chaining.
815      * <p>
816      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
817      * committed in a given segment.
818      *
819      * @return The storage builder.
820      */
821     public Builder<E> withFlushOnCommit() {
822       return withFlushOnCommit(true);
823     }
824
825     /**
826      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
827      * chaining.
828      * <p>
829      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
830      * committed in a given segment.
831      *
832      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
833      * @return The storage builder.
834      */
835     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
836       this.flushOnCommit = flushOnCommit;
837       return this;
838     }
839
840     /**
841      * Build the {@link SegmentedJournal}.
842      *
843      * @return A new {@link SegmentedJournal}.
844      */
845     public SegmentedJournal<E> build() {
846       return new SegmentedJournal<>(
847           name,
848           storageLevel,
849           directory,
850           namespace,
851           maxSegmentSize,
852           maxEntrySize,
853           maxEntriesPerSegment,
854           indexDensity,
855           flushOnCommit);
856     }
857   }
858 }