Remove atomix.utils.Builder
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.NavigableMap;
28 import java.util.SortedMap;
29 import java.util.TreeMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
31
32 import com.google.common.collect.Sets;
33 import io.atomix.storage.StorageException;
34 import io.atomix.storage.StorageLevel;
35 import io.atomix.utils.serializer.Namespace;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import static com.google.common.base.Preconditions.checkArgument;
40 import static com.google.common.base.Preconditions.checkNotNull;
41 import static com.google.common.base.Preconditions.checkState;
42
43 /**
44  * Segmented journal.
45  */
46 public class SegmentedJournal<E> implements Journal<E> {
47
48   /**
49    * Returns a new Raft log builder.
50    *
51    * @return A new Raft log builder.
52    */
53   public static <E> Builder<E> builder() {
54     return new Builder<>();
55   }
56
57   private static final int SEGMENT_BUFFER_FACTOR = 3;
58
59   private final Logger log = LoggerFactory.getLogger(getClass());
60   private final String name;
61   private final StorageLevel storageLevel;
62   private final File directory;
63   private final Namespace namespace;
64   private final int maxSegmentSize;
65   private final int maxEntrySize;
66   private final int maxEntriesPerSegment;
67   private final double indexDensity;
68   private final boolean flushOnCommit;
69   private final SegmentedJournalWriter<E> writer;
70   private volatile long commitIndex;
71
72   private final NavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
73   private final Collection<SegmentedJournalReader<E>> readers = Sets.newConcurrentHashSet();
74   private JournalSegment<E> currentSegment;
75
76   private volatile boolean open = true;
77
78   public SegmentedJournal(
79       String name,
80       StorageLevel storageLevel,
81       File directory,
82       Namespace namespace,
83       int maxSegmentSize,
84       int maxEntrySize,
85       int maxEntriesPerSegment,
86       double indexDensity,
87       boolean flushOnCommit) {
88     this.name = checkNotNull(name, "name cannot be null");
89     this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null");
90     this.directory = checkNotNull(directory, "directory cannot be null");
91     this.namespace = checkNotNull(namespace, "namespace cannot be null");
92     this.maxSegmentSize = maxSegmentSize;
93     this.maxEntrySize = maxEntrySize;
94     this.maxEntriesPerSegment = maxEntriesPerSegment;
95     this.indexDensity = indexDensity;
96     this.flushOnCommit = flushOnCommit;
97     open();
98     this.writer = openWriter();
99   }
100
101   /**
102    * Returns the segment file name prefix.
103    *
104    * @return The segment file name prefix.
105    */
106   public String name() {
107     return name;
108   }
109
110   /**
111    * Returns the storage directory.
112    * <p>
113    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
114    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
115    * when the log is opened.
116    *
117    * @return The storage directory.
118    */
119   public File directory() {
120     return directory;
121   }
122
123   /**
124    * Returns the storage level.
125    * <p>
126    * The storage level dictates how entries within individual journal segments should be stored.
127    *
128    * @return The storage level.
129    */
130   public StorageLevel storageLevel() {
131     return storageLevel;
132   }
133
134   /**
135    * Returns the maximum journal segment size.
136    * <p>
137    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
138    *
139    * @return The maximum segment size in bytes.
140    */
141   public int maxSegmentSize() {
142     return maxSegmentSize;
143   }
144
145   /**
146    * Returns the maximum journal entry size.
147    * <p>
148    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
149    *
150    * @return the maximum entry size in bytes
151    */
152   public int maxEntrySize() {
153     return maxEntrySize;
154   }
155
156   /**
157    * Returns the maximum number of entries per segment.
158    * <p>
159    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
160    * in a journal.
161    *
162    * @return The maximum number of entries per segment.
163    * @deprecated since 3.0.2
164    */
165   @Deprecated
166   public int maxEntriesPerSegment() {
167     return maxEntriesPerSegment;
168   }
169
170   /**
171    * Returns the collection of journal segments.
172    *
173    * @return the collection of journal segments
174    */
175   public Collection<JournalSegment<E>> segments() {
176     return segments.values();
177   }
178
179   /**
180    * Returns the collection of journal segments with indexes greater than the given index.
181    *
182    * @param index the starting index
183    * @return the journal segments starting with indexes greater than or equal to the given index
184    */
185   public Collection<JournalSegment<E>> segments(long index) {
186     return segments.tailMap(index).values();
187   }
188
189   /**
190    * Returns the total size of the journal.
191    *
192    * @return the total size of the journal
193    */
194   public long size() {
195     return segments.values().stream()
196         .mapToLong(segment -> segment.size())
197         .sum();
198   }
199
200   @Override
201   public SegmentedJournalWriter<E> writer() {
202     return writer;
203   }
204
205   @Override
206   public SegmentedJournalReader<E> openReader(long index) {
207     return openReader(index, SegmentedJournalReader.Mode.ALL);
208   }
209
210   /**
211    * Opens a new Raft log reader with the given reader mode.
212    *
213    * @param index The index from which to begin reading entries.
214    * @param mode The mode in which to read entries.
215    * @return The Raft log reader.
216    */
217   public SegmentedJournalReader<E> openReader(long index, SegmentedJournalReader.Mode mode) {
218     SegmentedJournalReader<E> reader = new SegmentedJournalReader<>(this, index, mode);
219     readers.add(reader);
220     return reader;
221   }
222
223   /**
224    * Opens a new journal writer.
225    *
226    * @return A new journal writer.
227    */
228   protected SegmentedJournalWriter<E> openWriter() {
229     return new SegmentedJournalWriter<>(this);
230   }
231
232   /**
233    * Opens the segments.
234    */
235   private void open() {
236     // Load existing log segments from disk.
237     for (JournalSegment<E> segment : loadSegments()) {
238       segments.put(segment.descriptor().index(), segment);
239     }
240
241     // If a segment doesn't already exist, create an initial segment starting at index 1.
242     if (!segments.isEmpty()) {
243       currentSegment = segments.lastEntry().getValue();
244     } else {
245       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
246           .withId(1)
247           .withIndex(1)
248           .withMaxSegmentSize(maxSegmentSize)
249           .withMaxEntries(maxEntriesPerSegment)
250           .build();
251
252       currentSegment = createSegment(descriptor);
253       currentSegment.descriptor().update(System.currentTimeMillis());
254
255       segments.put(1L, currentSegment);
256     }
257   }
258
259   /**
260    * Asserts that the manager is open.
261    *
262    * @throws IllegalStateException if the segment manager is not open
263    */
264   private void assertOpen() {
265     checkState(currentSegment != null, "journal not open");
266   }
267
268   /**
269    * Asserts that enough disk space is available to allocate a new segment.
270    */
271   private void assertDiskSpace() {
272     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
273       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
274     }
275   }
276
277   /**
278    * Resets the current segment, creating a new segment if necessary.
279    */
280   private synchronized void resetCurrentSegment() {
281     JournalSegment<E> lastSegment = getLastSegment();
282     if (lastSegment != null) {
283       currentSegment = lastSegment;
284     } else {
285       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
286           .withId(1)
287           .withIndex(1)
288           .withMaxSegmentSize(maxSegmentSize)
289           .withMaxEntries(maxEntriesPerSegment)
290           .build();
291
292       currentSegment = createSegment(descriptor);
293
294       segments.put(1L, currentSegment);
295     }
296   }
297
298   /**
299    * Resets and returns the first segment in the journal.
300    *
301    * @param index the starting index of the journal
302    * @return the first segment
303    */
304   JournalSegment<E> resetSegments(long index) {
305     assertOpen();
306
307     // If the index already equals the first segment index, skip the reset.
308     JournalSegment<E> firstSegment = getFirstSegment();
309     if (index == firstSegment.index()) {
310       return firstSegment;
311     }
312
313     for (JournalSegment<E> segment : segments.values()) {
314       segment.close();
315       segment.delete();
316     }
317     segments.clear();
318
319     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
320         .withId(1)
321         .withIndex(index)
322         .withMaxSegmentSize(maxSegmentSize)
323         .withMaxEntries(maxEntriesPerSegment)
324         .build();
325     currentSegment = createSegment(descriptor);
326     segments.put(index, currentSegment);
327     return currentSegment;
328   }
329
330   /**
331    * Returns the first segment in the log.
332    *
333    * @throws IllegalStateException if the segment manager is not open
334    */
335   JournalSegment<E> getFirstSegment() {
336     assertOpen();
337     Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
338     return segment != null ? segment.getValue() : null;
339   }
340
341   /**
342    * Returns the last segment in the log.
343    *
344    * @throws IllegalStateException if the segment manager is not open
345    */
346   JournalSegment<E> getLastSegment() {
347     assertOpen();
348     Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
349     return segment != null ? segment.getValue() : null;
350   }
351
352   /**
353    * Creates and returns the next segment.
354    *
355    * @return The next segment.
356    * @throws IllegalStateException if the segment manager is not open
357    */
358   synchronized JournalSegment<E> getNextSegment() {
359     assertOpen();
360     assertDiskSpace();
361
362     JournalSegment<E> lastSegment = getLastSegment();
363     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
364         .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
365         .withIndex(currentSegment.lastIndex() + 1)
366         .withMaxSegmentSize(maxSegmentSize)
367         .withMaxEntries(maxEntriesPerSegment)
368         .build();
369
370     currentSegment = createSegment(descriptor);
371
372     segments.put(descriptor.index(), currentSegment);
373     return currentSegment;
374   }
375
376   /**
377    * Returns the segment following the segment with the given ID.
378    *
379    * @param index The segment index with which to look up the next segment.
380    * @return The next segment for the given index.
381    */
382   JournalSegment<E> getNextSegment(long index) {
383     Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
384     return nextSegment != null ? nextSegment.getValue() : null;
385   }
386
387   /**
388    * Returns the segment for the given index.
389    *
390    * @param index The index for which to return the segment.
391    * @throws IllegalStateException if the segment manager is not open
392    */
393   synchronized JournalSegment<E> getSegment(long index) {
394     assertOpen();
395     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
396     if (currentSegment != null && index > currentSegment.index()) {
397       return currentSegment;
398     }
399
400     // If the index is in another segment, get the entry with the next lowest first index.
401     Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
402     if (segment != null) {
403       return segment.getValue();
404     }
405     return getFirstSegment();
406   }
407
408   /**
409    * Removes a segment.
410    *
411    * @param segment The segment to remove.
412    */
413   synchronized void removeSegment(JournalSegment<E> segment) {
414     segments.remove(segment.index());
415     segment.close();
416     segment.delete();
417     resetCurrentSegment();
418   }
419
420   /**
421    * Creates a new segment.
422    */
423   JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
424     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
425
426     RandomAccessFile raf;
427     FileChannel channel;
428     try {
429       raf = new RandomAccessFile(segmentFile, "rw");
430       raf.setLength(descriptor.maxSegmentSize());
431       channel =  raf.getChannel();
432     } catch (IOException e) {
433       throw new StorageException(e);
434     }
435
436     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
437     descriptor.copyTo(buffer);
438     buffer.flip();
439     try {
440       channel.write(buffer);
441     } catch (IOException e) {
442       throw new StorageException(e);
443     } finally {
444       try {
445         channel.close();
446         raf.close();
447       } catch (IOException e) {
448       }
449     }
450     JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
451     log.debug("Created segment: {}", segment);
452     return segment;
453   }
454
455   /**
456    * Creates a new segment instance.
457    *
458    * @param segmentFile The segment file.
459    * @param descriptor The segment descriptor.
460    * @return The segment instance.
461    */
462   protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
463     return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
464   }
465
466   /**
467    * Loads a segment.
468    */
469   private JournalSegment<E> loadSegment(long segmentId) {
470     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
471     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
472     try (FileChannel channel = openChannel(segmentFile)) {
473       channel.read(buffer);
474       buffer.flip();
475       JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
476       JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
477       log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
478       return segment;
479     } catch (IOException e) {
480       throw new StorageException(e);
481     }
482   }
483
484   private FileChannel openChannel(File file) {
485     try {
486       return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
487     } catch (IOException e) {
488       throw new StorageException(e);
489     }
490   }
491
492   /**
493    * Loads all segments from disk.
494    *
495    * @return A collection of segments for the log.
496    */
497   protected Collection<JournalSegment<E>> loadSegments() {
498     // Ensure log directories are created.
499     directory.mkdirs();
500
501     TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
502
503     // Iterate through all files in the log directory.
504     for (File file : directory.listFiles(File::isFile)) {
505
506       // If the file looks like a segment file, attempt to load the segment.
507       if (JournalSegmentFile.isSegmentFile(name, file)) {
508         JournalSegmentFile segmentFile = new JournalSegmentFile(file);
509         ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
510         try (FileChannel channel = openChannel(file)) {
511           channel.read(buffer);
512           buffer.flip();
513         } catch (IOException e) {
514           throw new StorageException(e);
515         }
516
517         JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
518
519         // Load the segment.
520         JournalSegment<E> segment = loadSegment(descriptor.id());
521
522         // Add the segment to the segments list.
523         log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
524         segments.put(segment.index(), segment);
525       }
526     }
527
528     // Verify that all the segments in the log align with one another.
529     JournalSegment<E> previousSegment = null;
530     boolean corrupted = false;
531     Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
532     while (iterator.hasNext()) {
533       JournalSegment<E> segment = iterator.next().getValue();
534       if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
535         log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
536         corrupted = true;
537       }
538       if (corrupted) {
539         segment.close();
540         segment.delete();
541         iterator.remove();
542       }
543       previousSegment = segment;
544     }
545
546     return segments.values();
547   }
548
549   /**
550    * Resets journal readers to the given head.
551    *
552    * @param index The index at which to reset readers.
553    */
554   void resetHead(long index) {
555     for (SegmentedJournalReader<E> reader : readers) {
556       if (reader.getNextIndex() < index) {
557         reader.reset(index);
558       }
559     }
560   }
561
562   /**
563    * Resets journal readers to the given tail.
564    *
565    * @param index The index at which to reset readers.
566    */
567   void resetTail(long index) {
568     for (SegmentedJournalReader<E> reader : readers) {
569       if (reader.getNextIndex() >= index) {
570         reader.reset(index);
571       }
572     }
573   }
574
575   void closeReader(SegmentedJournalReader<E> reader) {
576     readers.remove(reader);
577   }
578
579   @Override
580   public boolean isOpen() {
581     return open;
582   }
583
584   /**
585    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
586    *
587    * @param index the index from which to remove segments
588    * @return indicates whether a segment can be removed from the journal
589    */
590   public boolean isCompactable(long index) {
591     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
592     return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
593   }
594
595   /**
596    * Returns the index of the last segment in the log.
597    *
598    * @param index the compaction index
599    * @return the starting index of the last segment in the log
600    */
601   public long getCompactableIndex(long index) {
602     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
603     return segmentEntry != null ? segmentEntry.getValue().index() : 0;
604   }
605
606   /**
607    * Compacts the journal up to the given index.
608    * <p>
609    * The semantics of compaction are not specified by this interface.
610    *
611    * @param index The index up to which to compact the journal.
612    */
613   public void compact(long index) {
614     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
615     if (segmentEntry != null) {
616       SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
617       if (!compactSegments.isEmpty()) {
618         log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
619         for (JournalSegment<E> segment : compactSegments.values()) {
620           log.trace("Deleting segment: {}", segment);
621           segment.close();
622           segment.delete();
623         }
624         compactSegments.clear();
625         resetHead(segmentEntry.getValue().index());
626       }
627     }
628   }
629
630   @Override
631   public void close() {
632     segments.values().forEach(segment -> {
633       log.debug("Closing segment: {}", segment);
634       segment.close();
635     });
636     currentSegment = null;
637     open = false;
638   }
639
640   /**
641    * Returns whether {@code flushOnCommit} is enabled for the log.
642    *
643    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
644    */
645   boolean isFlushOnCommit() {
646     return flushOnCommit;
647   }
648
649   /**
650    * Commits entries up to the given index.
651    *
652    * @param index The index up to which to commit entries.
653    */
654   void setCommitIndex(long index) {
655     this.commitIndex = index;
656   }
657
658   /**
659    * Returns the Raft log commit index.
660    *
661    * @return The Raft log commit index.
662    */
663   long getCommitIndex() {
664     return commitIndex;
665   }
666
667   /**
668    * Raft log builder.
669    */
670   public static class Builder<E> {
671     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
672     private static final String DEFAULT_NAME = "atomix";
673     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
674     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
675     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
676     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
677     private static final double DEFAULT_INDEX_DENSITY = .005;
678     private static final int DEFAULT_CACHE_SIZE = 1024;
679
680     protected String name = DEFAULT_NAME;
681     protected StorageLevel storageLevel = StorageLevel.DISK;
682     protected File directory = new File(DEFAULT_DIRECTORY);
683     protected Namespace namespace;
684     protected int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
685     protected int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
686     protected int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
687     protected double indexDensity = DEFAULT_INDEX_DENSITY;
688     protected int cacheSize = DEFAULT_CACHE_SIZE;
689     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
690
691     protected Builder() {
692     }
693
694     /**
695      * Sets the storage name.
696      *
697      * @param name The storage name.
698      * @return The storage builder.
699      */
700     public Builder<E> withName(String name) {
701       this.name = checkNotNull(name, "name cannot be null");
702       return this;
703     }
704
705     /**
706      * Sets the log storage level, returning the builder for method chaining.
707      * <p>
708      * The storage level indicates how individual entries should be persisted in the journal.
709      *
710      * @param storageLevel The log storage level.
711      * @return The storage builder.
712      */
713     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
714       this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null");
715       return this;
716     }
717
718     /**
719      * Sets the log directory, returning the builder for method chaining.
720      * <p>
721      * The log will write segment files into the provided directory.
722      *
723      * @param directory The log directory.
724      * @return The storage builder.
725      * @throws NullPointerException If the {@code directory} is {@code null}
726      */
727     public Builder<E> withDirectory(String directory) {
728       return withDirectory(new File(checkNotNull(directory, "directory cannot be null")));
729     }
730
731     /**
732      * Sets the log directory, returning the builder for method chaining.
733      * <p>
734      * The log will write segment files into the provided directory.
735      *
736      * @param directory The log directory.
737      * @return The storage builder.
738      * @throws NullPointerException If the {@code directory} is {@code null}
739      */
740     public Builder<E> withDirectory(File directory) {
741       this.directory = checkNotNull(directory, "directory cannot be null");
742       return this;
743     }
744
745     /**
746      * Sets the journal namespace, returning the builder for method chaining.
747      *
748      * @param namespace The journal serializer.
749      * @return The journal builder.
750      */
751     public Builder<E> withNamespace(Namespace namespace) {
752       this.namespace = checkNotNull(namespace, "namespace cannot be null");
753       return this;
754     }
755
756     /**
757      * Sets the maximum segment size in bytes, returning the builder for method chaining.
758      * <p>
759      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
760      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
761      * segment and append new entries to that segment.
762      * <p>
763      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
764      *
765      * @param maxSegmentSize The maximum segment size in bytes.
766      * @return The storage builder.
767      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
768      */
769     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
770       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
771       this.maxSegmentSize = maxSegmentSize;
772       return this;
773     }
774
775     /**
776      * Sets the maximum entry size in bytes, returning the builder for method chaining.
777      *
778      * @param maxEntrySize the maximum entry size in bytes
779      * @return the storage builder
780      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
781      */
782     public Builder<E> withMaxEntrySize(int maxEntrySize) {
783       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
784       this.maxEntrySize = maxEntrySize;
785       return this;
786     }
787
788     /**
789      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
790      * <p>
791      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
792      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
793      * new segment and append new entries to that segment.
794      * <p>
795      * By default, the maximum entries per segment is {@code 1024 * 1024}.
796      *
797      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
798      * @return The storage builder.
799      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
800      *     per segment
801      * @deprecated since 3.0.2
802      */
803     @Deprecated
804     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
805       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
806       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
807           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
808       this.maxEntriesPerSegment = maxEntriesPerSegment;
809       return this;
810     }
811
812     /**
813      * Sets the journal index density.
814      * <p>
815      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
816      * in-memory index for faster seeking.
817      *
818      * @param indexDensity the index density
819      * @return the journal builder
820      * @throws IllegalArgumentException if the density is not between 0 and 1
821      */
822     public Builder<E> withIndexDensity(double indexDensity) {
823       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
824       this.indexDensity = indexDensity;
825       return this;
826     }
827
828     /**
829      * Sets the journal cache size.
830      *
831      * @param cacheSize the journal cache size
832      * @return the journal builder
833      * @throws IllegalArgumentException if the cache size is not positive
834      * @deprecated since 3.0.4
835      */
836     @Deprecated
837     public Builder<E> withCacheSize(int cacheSize) {
838       checkArgument(cacheSize >= 0, "cacheSize must be positive");
839       this.cacheSize = cacheSize;
840       return this;
841     }
842
843     /**
844      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
845      * chaining.
846      * <p>
847      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
848      * committed in a given segment.
849      *
850      * @return The storage builder.
851      */
852     public Builder<E> withFlushOnCommit() {
853       return withFlushOnCommit(true);
854     }
855
856     /**
857      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
858      * chaining.
859      * <p>
860      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
861      * committed in a given segment.
862      *
863      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
864      * @return The storage builder.
865      */
866     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
867       this.flushOnCommit = flushOnCommit;
868       return this;
869     }
870
871     /**
872      * Build the {@link SegmentedJournal}.
873      *
874      * @return A new {@link SegmentedJournal}.
875      */
876     public SegmentedJournal<E> build() {
877       return new SegmentedJournal<>(
878           name,
879           storageLevel,
880           directory,
881           namespace,
882           maxSegmentSize,
883           maxEntrySize,
884           maxEntriesPerSegment,
885           indexDensity,
886           flushOnCommit);
887     }
888   }
889 }