JournalReader is not an Iterator
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.SortedMap;
28 import java.util.TreeMap;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentNavigableMap;
31 import java.util.concurrent.ConcurrentSkipListMap;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import static com.google.common.base.Preconditions.checkArgument;
36 import static com.google.common.base.Preconditions.checkState;
37 import static java.util.Objects.requireNonNull;
38
39 /**
40  * Segmented journal.
41  */
42 public final class SegmentedJournal<E> implements Journal<E> {
43   /**
44    * Returns a new Raft log builder.
45    *
46    * @return A new Raft log builder.
47    */
48   public static <E> Builder<E> builder() {
49     return new Builder<>();
50   }
51
52   private static final int SEGMENT_BUFFER_FACTOR = 3;
53
54   private final Logger log = LoggerFactory.getLogger(getClass());
55   private final String name;
56   private final StorageLevel storageLevel;
57   private final File directory;
58   private final JournalSerdes namespace;
59   private final int maxSegmentSize;
60   private final int maxEntrySize;
61   private final int maxEntriesPerSegment;
62   private final double indexDensity;
63   private final boolean flushOnCommit;
64   private final SegmentedJournalWriter<E> writer;
65   private volatile long commitIndex;
66
67   private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
68   private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
69   private JournalSegment<E> currentSegment;
70
71   private volatile boolean open = true;
72
73   public SegmentedJournal(
74       String name,
75       StorageLevel storageLevel,
76       File directory,
77       JournalSerdes namespace,
78       int maxSegmentSize,
79       int maxEntrySize,
80       int maxEntriesPerSegment,
81       double indexDensity,
82       boolean flushOnCommit) {
83     this.name = requireNonNull(name, "name cannot be null");
84     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
85     this.directory = requireNonNull(directory, "directory cannot be null");
86     this.namespace = requireNonNull(namespace, "namespace cannot be null");
87     this.maxSegmentSize = maxSegmentSize;
88     this.maxEntrySize = maxEntrySize;
89     this.maxEntriesPerSegment = maxEntriesPerSegment;
90     this.indexDensity = indexDensity;
91     this.flushOnCommit = flushOnCommit;
92     open();
93     this.writer = new SegmentedJournalWriter<>(this);
94   }
95
96   /**
97    * Returns the segment file name prefix.
98    *
99    * @return The segment file name prefix.
100    */
101   public String name() {
102     return name;
103   }
104
105   /**
106    * Returns the storage directory.
107    * <p>
108    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
109    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
110    * when the log is opened.
111    *
112    * @return The storage directory.
113    */
114   public File directory() {
115     return directory;
116   }
117
118   /**
119    * Returns the storage level.
120    * <p>
121    * The storage level dictates how entries within individual journal segments should be stored.
122    *
123    * @return The storage level.
124    */
125   public StorageLevel storageLevel() {
126     return storageLevel;
127   }
128
129   /**
130    * Returns the maximum journal segment size.
131    * <p>
132    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
133    *
134    * @return The maximum segment size in bytes.
135    */
136   public int maxSegmentSize() {
137     return maxSegmentSize;
138   }
139
140   /**
141    * Returns the maximum journal entry size.
142    * <p>
143    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
144    *
145    * @return the maximum entry size in bytes
146    */
147   public int maxEntrySize() {
148     return maxEntrySize;
149   }
150
151   /**
152    * Returns the maximum number of entries per segment.
153    * <p>
154    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
155    * in a journal.
156    *
157    * @return The maximum number of entries per segment.
158    * @deprecated since 3.0.2
159    */
160   @Deprecated
161   public int maxEntriesPerSegment() {
162     return maxEntriesPerSegment;
163   }
164
165   /**
166    * Returns the collection of journal segments.
167    *
168    * @return the collection of journal segments
169    */
170   public Collection<JournalSegment<E>> segments() {
171     return segments.values();
172   }
173
174   /**
175    * Returns the collection of journal segments with indexes greater than the given index.
176    *
177    * @param index the starting index
178    * @return the journal segments starting with indexes greater than or equal to the given index
179    */
180   public Collection<JournalSegment<E>> segments(long index) {
181     return segments.tailMap(index).values();
182   }
183
184   /**
185    * Returns the total size of the journal.
186    *
187    * @return the total size of the journal
188    */
189   public long size() {
190     return segments.values().stream()
191         .mapToLong(segment -> segment.size())
192         .sum();
193   }
194
195   @Override
196   public JournalWriter<E> writer() {
197     return writer;
198   }
199
200   @Override
201   public JournalReader<E> openReader(long index) {
202     return openReader(index, JournalReader.Mode.ALL);
203   }
204
205   /**
206    * Opens a new Raft log reader with the given reader mode.
207    *
208    * @param index The index from which to begin reading entries.
209    * @param mode The mode in which to read entries.
210    * @return The Raft log reader.
211    */
212   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
213     final var segment = getSegment(index);
214     final var reader = switch (mode) {
215       case ALL -> new SegmentedJournalReader<>(this, segment);
216       case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
217     };
218
219     // Forward reader to specified index
220     long next = reader.getNextIndex();
221     while (index > next && reader.tryNext() != null) {
222         next = reader.getNextIndex();
223     }
224
225     readers.add(reader);
226     return reader;
227   }
228
229   /**
230    * Opens the segments.
231    */
232   private synchronized void open() {
233     // Load existing log segments from disk.
234     for (JournalSegment<E> segment : loadSegments()) {
235       segments.put(segment.descriptor().index(), segment);
236     }
237
238     // If a segment doesn't already exist, create an initial segment starting at index 1.
239     if (!segments.isEmpty()) {
240       currentSegment = segments.lastEntry().getValue();
241     } else {
242       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
243           .withId(1)
244           .withIndex(1)
245           .withMaxSegmentSize(maxSegmentSize)
246           .withMaxEntries(maxEntriesPerSegment)
247           .build();
248
249       currentSegment = createSegment(descriptor);
250       currentSegment.descriptor().update(System.currentTimeMillis());
251
252       segments.put(1L, currentSegment);
253     }
254   }
255
256   /**
257    * Asserts that the manager is open.
258    *
259    * @throws IllegalStateException if the segment manager is not open
260    */
261   private void assertOpen() {
262     checkState(currentSegment != null, "journal not open");
263   }
264
265   /**
266    * Asserts that enough disk space is available to allocate a new segment.
267    */
268   private void assertDiskSpace() {
269     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
270       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
271     }
272   }
273
274   /**
275    * Resets the current segment, creating a new segment if necessary.
276    */
277   private synchronized void resetCurrentSegment() {
278     JournalSegment<E> lastSegment = getLastSegment();
279     if (lastSegment != null) {
280       currentSegment = lastSegment;
281     } else {
282       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
283           .withId(1)
284           .withIndex(1)
285           .withMaxSegmentSize(maxSegmentSize)
286           .withMaxEntries(maxEntriesPerSegment)
287           .build();
288
289       currentSegment = createSegment(descriptor);
290
291       segments.put(1L, currentSegment);
292     }
293   }
294
295   /**
296    * Resets and returns the first segment in the journal.
297    *
298    * @param index the starting index of the journal
299    * @return the first segment
300    */
301   JournalSegment<E> resetSegments(long index) {
302     assertOpen();
303
304     // If the index already equals the first segment index, skip the reset.
305     JournalSegment<E> firstSegment = getFirstSegment();
306     if (index == firstSegment.index()) {
307       return firstSegment;
308     }
309
310     for (JournalSegment<E> segment : segments.values()) {
311       segment.close();
312       segment.delete();
313     }
314     segments.clear();
315
316     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
317         .withId(1)
318         .withIndex(index)
319         .withMaxSegmentSize(maxSegmentSize)
320         .withMaxEntries(maxEntriesPerSegment)
321         .build();
322     currentSegment = createSegment(descriptor);
323     segments.put(index, currentSegment);
324     return currentSegment;
325   }
326
327   /**
328    * Returns the first segment in the log.
329    *
330    * @throws IllegalStateException if the segment manager is not open
331    */
332   JournalSegment<E> getFirstSegment() {
333     assertOpen();
334     Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
335     return segment != null ? segment.getValue() : null;
336   }
337
338   /**
339    * Returns the last segment in the log.
340    *
341    * @throws IllegalStateException if the segment manager is not open
342    */
343   JournalSegment<E> getLastSegment() {
344     assertOpen();
345     Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
346     return segment != null ? segment.getValue() : null;
347   }
348
349   /**
350    * Creates and returns the next segment.
351    *
352    * @return The next segment.
353    * @throws IllegalStateException if the segment manager is not open
354    */
355   synchronized JournalSegment<E> getNextSegment() {
356     assertOpen();
357     assertDiskSpace();
358
359     JournalSegment<E> lastSegment = getLastSegment();
360     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
361         .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
362         .withIndex(currentSegment.lastIndex() + 1)
363         .withMaxSegmentSize(maxSegmentSize)
364         .withMaxEntries(maxEntriesPerSegment)
365         .build();
366
367     currentSegment = createSegment(descriptor);
368
369     segments.put(descriptor.index(), currentSegment);
370     return currentSegment;
371   }
372
373   /**
374    * Returns the segment following the segment with the given ID.
375    *
376    * @param index The segment index with which to look up the next segment.
377    * @return The next segment for the given index.
378    */
379   JournalSegment<E> getNextSegment(long index) {
380     Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
381     return nextSegment != null ? nextSegment.getValue() : null;
382   }
383
384   /**
385    * Returns the segment for the given index.
386    *
387    * @param index The index for which to return the segment.
388    * @throws IllegalStateException if the segment manager is not open
389    */
390   synchronized JournalSegment<E> getSegment(long index) {
391     assertOpen();
392     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
393     if (currentSegment != null && index > currentSegment.index()) {
394       return currentSegment;
395     }
396
397     // If the index is in another segment, get the entry with the next lowest first index.
398     Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
399     if (segment != null) {
400       return segment.getValue();
401     }
402     return getFirstSegment();
403   }
404
405   /**
406    * Removes a segment.
407    *
408    * @param segment The segment to remove.
409    */
410   synchronized void removeSegment(JournalSegment<E> segment) {
411     segments.remove(segment.index());
412     segment.close();
413     segment.delete();
414     resetCurrentSegment();
415   }
416
417   /**
418    * Creates a new segment.
419    */
420   JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
421     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
422
423     RandomAccessFile raf;
424     FileChannel channel;
425     try {
426       raf = new RandomAccessFile(segmentFile, "rw");
427       raf.setLength(descriptor.maxSegmentSize());
428       channel =  raf.getChannel();
429     } catch (IOException e) {
430       throw new StorageException(e);
431     }
432
433     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
434     descriptor.copyTo(buffer);
435     buffer.flip();
436     try {
437       channel.write(buffer);
438     } catch (IOException e) {
439       throw new StorageException(e);
440     } finally {
441       try {
442         channel.close();
443         raf.close();
444       } catch (IOException e) {
445       }
446     }
447     JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
448     log.debug("Created segment: {}", segment);
449     return segment;
450   }
451
452   /**
453    * Creates a new segment instance.
454    *
455    * @param segmentFile The segment file.
456    * @param descriptor The segment descriptor.
457    * @return The segment instance.
458    */
459   protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
460     return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
461   }
462
463   /**
464    * Loads a segment.
465    */
466   private JournalSegment<E> loadSegment(long segmentId) {
467     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
468     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
469     try (FileChannel channel = openChannel(segmentFile)) {
470       channel.read(buffer);
471       buffer.flip();
472       JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
473       JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
474       log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
475       return segment;
476     } catch (IOException e) {
477       throw new StorageException(e);
478     }
479   }
480
481   private FileChannel openChannel(File file) {
482     try {
483       return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
484     } catch (IOException e) {
485       throw new StorageException(e);
486     }
487   }
488
489   /**
490    * Loads all segments from disk.
491    *
492    * @return A collection of segments for the log.
493    */
494   protected Collection<JournalSegment<E>> loadSegments() {
495     // Ensure log directories are created.
496     directory.mkdirs();
497
498     TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
499
500     // Iterate through all files in the log directory.
501     for (File file : directory.listFiles(File::isFile)) {
502
503       // If the file looks like a segment file, attempt to load the segment.
504       if (JournalSegmentFile.isSegmentFile(name, file)) {
505         JournalSegmentFile segmentFile = new JournalSegmentFile(file);
506         ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
507         try (FileChannel channel = openChannel(file)) {
508           channel.read(buffer);
509           buffer.flip();
510         } catch (IOException e) {
511           throw new StorageException(e);
512         }
513
514         JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
515
516         // Load the segment.
517         JournalSegment<E> segment = loadSegment(descriptor.id());
518
519         // Add the segment to the segments list.
520         log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
521         segments.put(segment.index(), segment);
522       }
523     }
524
525     // Verify that all the segments in the log align with one another.
526     JournalSegment<E> previousSegment = null;
527     boolean corrupted = false;
528     Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
529     while (iterator.hasNext()) {
530       JournalSegment<E> segment = iterator.next().getValue();
531       if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
532         log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
533         corrupted = true;
534       }
535       if (corrupted) {
536         segment.close();
537         segment.delete();
538         iterator.remove();
539       }
540       previousSegment = segment;
541     }
542
543     return segments.values();
544   }
545
546   /**
547    * Resets journal readers to the given head.
548    *
549    * @param index The index at which to reset readers.
550    */
551   void resetHead(long index) {
552     for (SegmentedJournalReader<E> reader : readers) {
553       if (reader.getNextIndex() < index) {
554         reader.reset(index);
555       }
556     }
557   }
558
559   /**
560    * Resets journal readers to the given tail.
561    *
562    * @param index The index at which to reset readers.
563    */
564   void resetTail(long index) {
565     for (SegmentedJournalReader<E> reader : readers) {
566       if (reader.getNextIndex() >= index) {
567         reader.reset(index);
568       }
569     }
570   }
571
572   void closeReader(SegmentedJournalReader<E> reader) {
573     readers.remove(reader);
574   }
575
576   @Override
577   public boolean isOpen() {
578     return open;
579   }
580
581   /**
582    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
583    *
584    * @param index the index from which to remove segments
585    * @return indicates whether a segment can be removed from the journal
586    */
587   public boolean isCompactable(long index) {
588     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
589     return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
590   }
591
592   /**
593    * Returns the index of the last segment in the log.
594    *
595    * @param index the compaction index
596    * @return the starting index of the last segment in the log
597    */
598   public long getCompactableIndex(long index) {
599     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
600     return segmentEntry != null ? segmentEntry.getValue().index() : 0;
601   }
602
603   /**
604    * Compacts the journal up to the given index.
605    * <p>
606    * The semantics of compaction are not specified by this interface.
607    *
608    * @param index The index up to which to compact the journal.
609    */
610   public void compact(long index) {
611     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
612     if (segmentEntry != null) {
613       SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
614       if (!compactSegments.isEmpty()) {
615         log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
616         for (JournalSegment<E> segment : compactSegments.values()) {
617           log.trace("Deleting segment: {}", segment);
618           segment.close();
619           segment.delete();
620         }
621         compactSegments.clear();
622         resetHead(segmentEntry.getValue().index());
623       }
624     }
625   }
626
627   @Override
628   public void close() {
629     segments.values().forEach(segment -> {
630       log.debug("Closing segment: {}", segment);
631       segment.close();
632     });
633     currentSegment = null;
634     open = false;
635   }
636
637   /**
638    * Returns whether {@code flushOnCommit} is enabled for the log.
639    *
640    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
641    */
642   boolean isFlushOnCommit() {
643     return flushOnCommit;
644   }
645
646   /**
647    * Commits entries up to the given index.
648    *
649    * @param index The index up to which to commit entries.
650    */
651   void setCommitIndex(long index) {
652     this.commitIndex = index;
653   }
654
655   /**
656    * Returns the Raft log commit index.
657    *
658    * @return The Raft log commit index.
659    */
660   long getCommitIndex() {
661     return commitIndex;
662   }
663
664   /**
665    * Raft log builder.
666    */
667   public static final class Builder<E> {
668     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
669     private static final String DEFAULT_NAME = "atomix";
670     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
671     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
672     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
673     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
674     private static final double DEFAULT_INDEX_DENSITY = .005;
675
676     private String name = DEFAULT_NAME;
677     private StorageLevel storageLevel = StorageLevel.DISK;
678     private File directory = new File(DEFAULT_DIRECTORY);
679     private JournalSerdes namespace;
680     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
681     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
682     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
683     private double indexDensity = DEFAULT_INDEX_DENSITY;
684     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
685
686     protected Builder() {
687     }
688
689     /**
690      * Sets the storage name.
691      *
692      * @param name The storage name.
693      * @return The storage builder.
694      */
695     public Builder<E> withName(String name) {
696       this.name = requireNonNull(name, "name cannot be null");
697       return this;
698     }
699
700     /**
701      * Sets the log storage level, returning the builder for method chaining.
702      * <p>
703      * The storage level indicates how individual entries should be persisted in the journal.
704      *
705      * @param storageLevel The log storage level.
706      * @return The storage builder.
707      */
708     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
709       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
710       return this;
711     }
712
713     /**
714      * Sets the log directory, returning the builder for method chaining.
715      * <p>
716      * The log will write segment files into the provided directory.
717      *
718      * @param directory The log directory.
719      * @return The storage builder.
720      * @throws NullPointerException If the {@code directory} is {@code null}
721      */
722     public Builder<E> withDirectory(String directory) {
723       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
724     }
725
726     /**
727      * Sets the log directory, returning the builder for method chaining.
728      * <p>
729      * The log will write segment files into the provided directory.
730      *
731      * @param directory The log directory.
732      * @return The storage builder.
733      * @throws NullPointerException If the {@code directory} is {@code null}
734      */
735     public Builder<E> withDirectory(File directory) {
736       this.directory = requireNonNull(directory, "directory cannot be null");
737       return this;
738     }
739
740     /**
741      * Sets the journal namespace, returning the builder for method chaining.
742      *
743      * @param namespace The journal serializer.
744      * @return The journal builder.
745      */
746     public Builder<E> withNamespace(JournalSerdes namespace) {
747       this.namespace = requireNonNull(namespace, "namespace cannot be null");
748       return this;
749     }
750
751     /**
752      * Sets the maximum segment size in bytes, returning the builder for method chaining.
753      * <p>
754      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
755      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
756      * segment and append new entries to that segment.
757      * <p>
758      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
759      *
760      * @param maxSegmentSize The maximum segment size in bytes.
761      * @return The storage builder.
762      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
763      */
764     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
765       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
766       this.maxSegmentSize = maxSegmentSize;
767       return this;
768     }
769
770     /**
771      * Sets the maximum entry size in bytes, returning the builder for method chaining.
772      *
773      * @param maxEntrySize the maximum entry size in bytes
774      * @return the storage builder
775      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
776      */
777     public Builder<E> withMaxEntrySize(int maxEntrySize) {
778       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
779       this.maxEntrySize = maxEntrySize;
780       return this;
781     }
782
783     /**
784      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
785      * <p>
786      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
787      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
788      * new segment and append new entries to that segment.
789      * <p>
790      * By default, the maximum entries per segment is {@code 1024 * 1024}.
791      *
792      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
793      * @return The storage builder.
794      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
795      *     per segment
796      * @deprecated since 3.0.2
797      */
798     @Deprecated
799     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
800       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
801       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
802           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
803       this.maxEntriesPerSegment = maxEntriesPerSegment;
804       return this;
805     }
806
807     /**
808      * Sets the journal index density.
809      * <p>
810      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
811      * in-memory index for faster seeking.
812      *
813      * @param indexDensity the index density
814      * @return the journal builder
815      * @throws IllegalArgumentException if the density is not between 0 and 1
816      */
817     public Builder<E> withIndexDensity(double indexDensity) {
818       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
819       this.indexDensity = indexDensity;
820       return this;
821     }
822
823     /**
824      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
825      * chaining.
826      * <p>
827      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
828      * committed in a given segment.
829      *
830      * @return The storage builder.
831      */
832     public Builder<E> withFlushOnCommit() {
833       return withFlushOnCommit(true);
834     }
835
836     /**
837      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
838      * chaining.
839      * <p>
840      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
841      * committed in a given segment.
842      *
843      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
844      * @return The storage builder.
845      */
846     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
847       this.flushOnCommit = flushOnCommit;
848       return this;
849     }
850
851     /**
852      * Build the {@link SegmentedJournal}.
853      *
854      * @return A new {@link SegmentedJournal}.
855      */
856     public SegmentedJournal<E> build() {
857       return new SegmentedJournal<>(
858           name,
859           storageLevel,
860           directory,
861           namespace,
862           maxSegmentSize,
863           maxEntrySize,
864           maxEntriesPerSegment,
865           indexDensity,
866           flushOnCommit);
867     }
868   }
869 }