Synchronize SegmentedJournal.open()
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.io.File;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.NavigableMap;
28 import java.util.SortedMap;
29 import java.util.TreeMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
31
32 import com.google.common.collect.Sets;
33 import io.atomix.utils.serializer.Namespace;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import static com.google.common.base.Preconditions.checkArgument;
38 import static com.google.common.base.Preconditions.checkState;
39 import static java.util.Objects.requireNonNull;
40
41 /**
42  * Segmented journal.
43  */
44 public final class SegmentedJournal<E> implements Journal<E> {
45
46   /**
47    * Returns a new Raft log builder.
48    *
49    * @return A new Raft log builder.
50    */
51   public static <E> Builder<E> builder() {
52     return new Builder<>();
53   }
54
55   private static final int SEGMENT_BUFFER_FACTOR = 3;
56
57   private final Logger log = LoggerFactory.getLogger(getClass());
58   private final String name;
59   private final StorageLevel storageLevel;
60   private final File directory;
61   private final Namespace namespace;
62   private final int maxSegmentSize;
63   private final int maxEntrySize;
64   private final int maxEntriesPerSegment;
65   private final double indexDensity;
66   private final boolean flushOnCommit;
67   private final SegmentedJournalWriter<E> writer;
68   private volatile long commitIndex;
69
70   private final NavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
71   private final Collection<SegmentedJournalReader<E>> readers = Sets.newConcurrentHashSet();
72   private JournalSegment<E> currentSegment;
73
74   private volatile boolean open = true;
75
76   public SegmentedJournal(
77       String name,
78       StorageLevel storageLevel,
79       File directory,
80       Namespace namespace,
81       int maxSegmentSize,
82       int maxEntrySize,
83       int maxEntriesPerSegment,
84       double indexDensity,
85       boolean flushOnCommit) {
86     this.name = requireNonNull(name, "name cannot be null");
87     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
88     this.directory = requireNonNull(directory, "directory cannot be null");
89     this.namespace = requireNonNull(namespace, "namespace cannot be null");
90     this.maxSegmentSize = maxSegmentSize;
91     this.maxEntrySize = maxEntrySize;
92     this.maxEntriesPerSegment = maxEntriesPerSegment;
93     this.indexDensity = indexDensity;
94     this.flushOnCommit = flushOnCommit;
95     open();
96     this.writer = openWriter();
97   }
98
99   /**
100    * Returns the segment file name prefix.
101    *
102    * @return The segment file name prefix.
103    */
104   public String name() {
105     return name;
106   }
107
108   /**
109    * Returns the storage directory.
110    * <p>
111    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
112    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
113    * when the log is opened.
114    *
115    * @return The storage directory.
116    */
117   public File directory() {
118     return directory;
119   }
120
121   /**
122    * Returns the storage level.
123    * <p>
124    * The storage level dictates how entries within individual journal segments should be stored.
125    *
126    * @return The storage level.
127    */
128   public StorageLevel storageLevel() {
129     return storageLevel;
130   }
131
132   /**
133    * Returns the maximum journal segment size.
134    * <p>
135    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
136    *
137    * @return The maximum segment size in bytes.
138    */
139   public int maxSegmentSize() {
140     return maxSegmentSize;
141   }
142
143   /**
144    * Returns the maximum journal entry size.
145    * <p>
146    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
147    *
148    * @return the maximum entry size in bytes
149    */
150   public int maxEntrySize() {
151     return maxEntrySize;
152   }
153
154   /**
155    * Returns the maximum number of entries per segment.
156    * <p>
157    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
158    * in a journal.
159    *
160    * @return The maximum number of entries per segment.
161    * @deprecated since 3.0.2
162    */
163   @Deprecated
164   public int maxEntriesPerSegment() {
165     return maxEntriesPerSegment;
166   }
167
168   /**
169    * Returns the collection of journal segments.
170    *
171    * @return the collection of journal segments
172    */
173   public Collection<JournalSegment<E>> segments() {
174     return segments.values();
175   }
176
177   /**
178    * Returns the collection of journal segments with indexes greater than the given index.
179    *
180    * @param index the starting index
181    * @return the journal segments starting with indexes greater than or equal to the given index
182    */
183   public Collection<JournalSegment<E>> segments(long index) {
184     return segments.tailMap(index).values();
185   }
186
187   /**
188    * Returns the total size of the journal.
189    *
190    * @return the total size of the journal
191    */
192   public long size() {
193     return segments.values().stream()
194         .mapToLong(segment -> segment.size())
195         .sum();
196   }
197
198   @Override
199   public SegmentedJournalWriter<E> writer() {
200     return writer;
201   }
202
203   @Override
204   public SegmentedJournalReader<E> openReader(long index) {
205     return openReader(index, SegmentedJournalReader.Mode.ALL);
206   }
207
208   /**
209    * Opens a new Raft log reader with the given reader mode.
210    *
211    * @param index The index from which to begin reading entries.
212    * @param mode The mode in which to read entries.
213    * @return The Raft log reader.
214    */
215   public SegmentedJournalReader<E> openReader(long index, SegmentedJournalReader.Mode mode) {
216     SegmentedJournalReader<E> reader = new SegmentedJournalReader<>(this, index, mode);
217     readers.add(reader);
218     return reader;
219   }
220
221   /**
222    * Opens a new journal writer.
223    *
224    * @return A new journal writer.
225    */
226   protected SegmentedJournalWriter<E> openWriter() {
227     return new SegmentedJournalWriter<>(this);
228   }
229
230   /**
231    * Opens the segments.
232    */
233   private synchronized void open() {
234     // Load existing log segments from disk.
235     for (JournalSegment<E> segment : loadSegments()) {
236       segments.put(segment.descriptor().index(), segment);
237     }
238
239     // If a segment doesn't already exist, create an initial segment starting at index 1.
240     if (!segments.isEmpty()) {
241       currentSegment = segments.lastEntry().getValue();
242     } else {
243       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
244           .withId(1)
245           .withIndex(1)
246           .withMaxSegmentSize(maxSegmentSize)
247           .withMaxEntries(maxEntriesPerSegment)
248           .build();
249
250       currentSegment = createSegment(descriptor);
251       currentSegment.descriptor().update(System.currentTimeMillis());
252
253       segments.put(1L, currentSegment);
254     }
255   }
256
257   /**
258    * Asserts that the manager is open.
259    *
260    * @throws IllegalStateException if the segment manager is not open
261    */
262   private void assertOpen() {
263     checkState(currentSegment != null, "journal not open");
264   }
265
266   /**
267    * Asserts that enough disk space is available to allocate a new segment.
268    */
269   private void assertDiskSpace() {
270     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
271       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
272     }
273   }
274
275   /**
276    * Resets the current segment, creating a new segment if necessary.
277    */
278   private synchronized void resetCurrentSegment() {
279     JournalSegment<E> lastSegment = getLastSegment();
280     if (lastSegment != null) {
281       currentSegment = lastSegment;
282     } else {
283       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
284           .withId(1)
285           .withIndex(1)
286           .withMaxSegmentSize(maxSegmentSize)
287           .withMaxEntries(maxEntriesPerSegment)
288           .build();
289
290       currentSegment = createSegment(descriptor);
291
292       segments.put(1L, currentSegment);
293     }
294   }
295
296   /**
297    * Resets and returns the first segment in the journal.
298    *
299    * @param index the starting index of the journal
300    * @return the first segment
301    */
302   JournalSegment<E> resetSegments(long index) {
303     assertOpen();
304
305     // If the index already equals the first segment index, skip the reset.
306     JournalSegment<E> firstSegment = getFirstSegment();
307     if (index == firstSegment.index()) {
308       return firstSegment;
309     }
310
311     for (JournalSegment<E> segment : segments.values()) {
312       segment.close();
313       segment.delete();
314     }
315     segments.clear();
316
317     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
318         .withId(1)
319         .withIndex(index)
320         .withMaxSegmentSize(maxSegmentSize)
321         .withMaxEntries(maxEntriesPerSegment)
322         .build();
323     currentSegment = createSegment(descriptor);
324     segments.put(index, currentSegment);
325     return currentSegment;
326   }
327
328   /**
329    * Returns the first segment in the log.
330    *
331    * @throws IllegalStateException if the segment manager is not open
332    */
333   JournalSegment<E> getFirstSegment() {
334     assertOpen();
335     Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
336     return segment != null ? segment.getValue() : null;
337   }
338
339   /**
340    * Returns the last segment in the log.
341    *
342    * @throws IllegalStateException if the segment manager is not open
343    */
344   JournalSegment<E> getLastSegment() {
345     assertOpen();
346     Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
347     return segment != null ? segment.getValue() : null;
348   }
349
350   /**
351    * Creates and returns the next segment.
352    *
353    * @return The next segment.
354    * @throws IllegalStateException if the segment manager is not open
355    */
356   synchronized JournalSegment<E> getNextSegment() {
357     assertOpen();
358     assertDiskSpace();
359
360     JournalSegment<E> lastSegment = getLastSegment();
361     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
362         .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
363         .withIndex(currentSegment.lastIndex() + 1)
364         .withMaxSegmentSize(maxSegmentSize)
365         .withMaxEntries(maxEntriesPerSegment)
366         .build();
367
368     currentSegment = createSegment(descriptor);
369
370     segments.put(descriptor.index(), currentSegment);
371     return currentSegment;
372   }
373
374   /**
375    * Returns the segment following the segment with the given ID.
376    *
377    * @param index The segment index with which to look up the next segment.
378    * @return The next segment for the given index.
379    */
380   JournalSegment<E> getNextSegment(long index) {
381     Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
382     return nextSegment != null ? nextSegment.getValue() : null;
383   }
384
385   /**
386    * Returns the segment for the given index.
387    *
388    * @param index The index for which to return the segment.
389    * @throws IllegalStateException if the segment manager is not open
390    */
391   synchronized JournalSegment<E> getSegment(long index) {
392     assertOpen();
393     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
394     if (currentSegment != null && index > currentSegment.index()) {
395       return currentSegment;
396     }
397
398     // If the index is in another segment, get the entry with the next lowest first index.
399     Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
400     if (segment != null) {
401       return segment.getValue();
402     }
403     return getFirstSegment();
404   }
405
406   /**
407    * Removes a segment.
408    *
409    * @param segment The segment to remove.
410    */
411   synchronized void removeSegment(JournalSegment<E> segment) {
412     segments.remove(segment.index());
413     segment.close();
414     segment.delete();
415     resetCurrentSegment();
416   }
417
418   /**
419    * Creates a new segment.
420    */
421   JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
422     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
423
424     RandomAccessFile raf;
425     FileChannel channel;
426     try {
427       raf = new RandomAccessFile(segmentFile, "rw");
428       raf.setLength(descriptor.maxSegmentSize());
429       channel =  raf.getChannel();
430     } catch (IOException e) {
431       throw new StorageException(e);
432     }
433
434     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
435     descriptor.copyTo(buffer);
436     buffer.flip();
437     try {
438       channel.write(buffer);
439     } catch (IOException e) {
440       throw new StorageException(e);
441     } finally {
442       try {
443         channel.close();
444         raf.close();
445       } catch (IOException e) {
446       }
447     }
448     JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
449     log.debug("Created segment: {}", segment);
450     return segment;
451   }
452
453   /**
454    * Creates a new segment instance.
455    *
456    * @param segmentFile The segment file.
457    * @param descriptor The segment descriptor.
458    * @return The segment instance.
459    */
460   protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
461     return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
462   }
463
464   /**
465    * Loads a segment.
466    */
467   private JournalSegment<E> loadSegment(long segmentId) {
468     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
469     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
470     try (FileChannel channel = openChannel(segmentFile)) {
471       channel.read(buffer);
472       buffer.flip();
473       JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
474       JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
475       log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
476       return segment;
477     } catch (IOException e) {
478       throw new StorageException(e);
479     }
480   }
481
482   private FileChannel openChannel(File file) {
483     try {
484       return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
485     } catch (IOException e) {
486       throw new StorageException(e);
487     }
488   }
489
490   /**
491    * Loads all segments from disk.
492    *
493    * @return A collection of segments for the log.
494    */
495   protected Collection<JournalSegment<E>> loadSegments() {
496     // Ensure log directories are created.
497     directory.mkdirs();
498
499     TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
500
501     // Iterate through all files in the log directory.
502     for (File file : directory.listFiles(File::isFile)) {
503
504       // If the file looks like a segment file, attempt to load the segment.
505       if (JournalSegmentFile.isSegmentFile(name, file)) {
506         JournalSegmentFile segmentFile = new JournalSegmentFile(file);
507         ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
508         try (FileChannel channel = openChannel(file)) {
509           channel.read(buffer);
510           buffer.flip();
511         } catch (IOException e) {
512           throw new StorageException(e);
513         }
514
515         JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
516
517         // Load the segment.
518         JournalSegment<E> segment = loadSegment(descriptor.id());
519
520         // Add the segment to the segments list.
521         log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
522         segments.put(segment.index(), segment);
523       }
524     }
525
526     // Verify that all the segments in the log align with one another.
527     JournalSegment<E> previousSegment = null;
528     boolean corrupted = false;
529     Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
530     while (iterator.hasNext()) {
531       JournalSegment<E> segment = iterator.next().getValue();
532       if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
533         log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
534         corrupted = true;
535       }
536       if (corrupted) {
537         segment.close();
538         segment.delete();
539         iterator.remove();
540       }
541       previousSegment = segment;
542     }
543
544     return segments.values();
545   }
546
547   /**
548    * Resets journal readers to the given head.
549    *
550    * @param index The index at which to reset readers.
551    */
552   void resetHead(long index) {
553     for (SegmentedJournalReader<E> reader : readers) {
554       if (reader.getNextIndex() < index) {
555         reader.reset(index);
556       }
557     }
558   }
559
560   /**
561    * Resets journal readers to the given tail.
562    *
563    * @param index The index at which to reset readers.
564    */
565   void resetTail(long index) {
566     for (SegmentedJournalReader<E> reader : readers) {
567       if (reader.getNextIndex() >= index) {
568         reader.reset(index);
569       }
570     }
571   }
572
573   void closeReader(SegmentedJournalReader<E> reader) {
574     readers.remove(reader);
575   }
576
577   @Override
578   public boolean isOpen() {
579     return open;
580   }
581
582   /**
583    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
584    *
585    * @param index the index from which to remove segments
586    * @return indicates whether a segment can be removed from the journal
587    */
588   public boolean isCompactable(long index) {
589     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
590     return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
591   }
592
593   /**
594    * Returns the index of the last segment in the log.
595    *
596    * @param index the compaction index
597    * @return the starting index of the last segment in the log
598    */
599   public long getCompactableIndex(long index) {
600     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
601     return segmentEntry != null ? segmentEntry.getValue().index() : 0;
602   }
603
604   /**
605    * Compacts the journal up to the given index.
606    * <p>
607    * The semantics of compaction are not specified by this interface.
608    *
609    * @param index The index up to which to compact the journal.
610    */
611   public void compact(long index) {
612     Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
613     if (segmentEntry != null) {
614       SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
615       if (!compactSegments.isEmpty()) {
616         log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
617         for (JournalSegment<E> segment : compactSegments.values()) {
618           log.trace("Deleting segment: {}", segment);
619           segment.close();
620           segment.delete();
621         }
622         compactSegments.clear();
623         resetHead(segmentEntry.getValue().index());
624       }
625     }
626   }
627
628   @Override
629   public void close() {
630     segments.values().forEach(segment -> {
631       log.debug("Closing segment: {}", segment);
632       segment.close();
633     });
634     currentSegment = null;
635     open = false;
636   }
637
638   /**
639    * Returns whether {@code flushOnCommit} is enabled for the log.
640    *
641    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
642    */
643   boolean isFlushOnCommit() {
644     return flushOnCommit;
645   }
646
647   /**
648    * Commits entries up to the given index.
649    *
650    * @param index The index up to which to commit entries.
651    */
652   void setCommitIndex(long index) {
653     this.commitIndex = index;
654   }
655
656   /**
657    * Returns the Raft log commit index.
658    *
659    * @return The Raft log commit index.
660    */
661   long getCommitIndex() {
662     return commitIndex;
663   }
664
665   /**
666    * Raft log builder.
667    */
668   public static final class Builder<E> {
669     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
670     private static final String DEFAULT_NAME = "atomix";
671     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
672     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
673     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
674     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
675     private static final double DEFAULT_INDEX_DENSITY = .005;
676
677     private String name = DEFAULT_NAME;
678     private StorageLevel storageLevel = StorageLevel.DISK;
679     private File directory = new File(DEFAULT_DIRECTORY);
680     private Namespace namespace;
681     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
682     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
683     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
684     private double indexDensity = DEFAULT_INDEX_DENSITY;
685     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
686
687     protected Builder() {
688     }
689
690     /**
691      * Sets the storage name.
692      *
693      * @param name The storage name.
694      * @return The storage builder.
695      */
696     public Builder<E> withName(String name) {
697       this.name = requireNonNull(name, "name cannot be null");
698       return this;
699     }
700
701     /**
702      * Sets the log storage level, returning the builder for method chaining.
703      * <p>
704      * The storage level indicates how individual entries should be persisted in the journal.
705      *
706      * @param storageLevel The log storage level.
707      * @return The storage builder.
708      */
709     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
710       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
711       return this;
712     }
713
714     /**
715      * Sets the log directory, returning the builder for method chaining.
716      * <p>
717      * The log will write segment files into the provided directory.
718      *
719      * @param directory The log directory.
720      * @return The storage builder.
721      * @throws NullPointerException If the {@code directory} is {@code null}
722      */
723     public Builder<E> withDirectory(String directory) {
724       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
725     }
726
727     /**
728      * Sets the log directory, returning the builder for method chaining.
729      * <p>
730      * The log will write segment files into the provided directory.
731      *
732      * @param directory The log directory.
733      * @return The storage builder.
734      * @throws NullPointerException If the {@code directory} is {@code null}
735      */
736     public Builder<E> withDirectory(File directory) {
737       this.directory = requireNonNull(directory, "directory cannot be null");
738       return this;
739     }
740
741     /**
742      * Sets the journal namespace, returning the builder for method chaining.
743      *
744      * @param namespace The journal serializer.
745      * @return The journal builder.
746      */
747     public Builder<E> withNamespace(Namespace namespace) {
748       this.namespace = requireNonNull(namespace, "namespace cannot be null");
749       return this;
750     }
751
752     /**
753      * Sets the maximum segment size in bytes, returning the builder for method chaining.
754      * <p>
755      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
756      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
757      * segment and append new entries to that segment.
758      * <p>
759      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
760      *
761      * @param maxSegmentSize The maximum segment size in bytes.
762      * @return The storage builder.
763      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
764      */
765     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
766       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
767       this.maxSegmentSize = maxSegmentSize;
768       return this;
769     }
770
771     /**
772      * Sets the maximum entry size in bytes, returning the builder for method chaining.
773      *
774      * @param maxEntrySize the maximum entry size in bytes
775      * @return the storage builder
776      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
777      */
778     public Builder<E> withMaxEntrySize(int maxEntrySize) {
779       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
780       this.maxEntrySize = maxEntrySize;
781       return this;
782     }
783
784     /**
785      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
786      * <p>
787      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
788      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
789      * new segment and append new entries to that segment.
790      * <p>
791      * By default, the maximum entries per segment is {@code 1024 * 1024}.
792      *
793      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
794      * @return The storage builder.
795      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
796      *     per segment
797      * @deprecated since 3.0.2
798      */
799     @Deprecated
800     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
801       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
802       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
803           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
804       this.maxEntriesPerSegment = maxEntriesPerSegment;
805       return this;
806     }
807
808     /**
809      * Sets the journal index density.
810      * <p>
811      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
812      * in-memory index for faster seeking.
813      *
814      * @param indexDensity the index density
815      * @return the journal builder
816      * @throws IllegalArgumentException if the density is not between 0 and 1
817      */
818     public Builder<E> withIndexDensity(double indexDensity) {
819       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
820       this.indexDensity = indexDensity;
821       return this;
822     }
823
824     /**
825      * Sets the journal cache size.
826      *
827      * @param cacheSize the journal cache size
828      * @return the journal builder
829      * @throws IllegalArgumentException if the cache size is not positive
830      * @deprecated since 3.0.4
831      */
832     @Deprecated
833     public Builder<E> withCacheSize(int cacheSize) {
834       checkArgument(cacheSize >= 0, "cacheSize must be positive");
835       return this;
836     }
837
838     /**
839      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
840      * chaining.
841      * <p>
842      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
843      * committed in a given segment.
844      *
845      * @return The storage builder.
846      */
847     public Builder<E> withFlushOnCommit() {
848       return withFlushOnCommit(true);
849     }
850
851     /**
852      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
853      * chaining.
854      * <p>
855      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
856      * committed in a given segment.
857      *
858      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
859      * @return The storage builder.
860      */
861     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
862       this.flushOnCommit = flushOnCommit;
863       return this;
864     }
865
866     /**
867      * Build the {@link SegmentedJournal}.
868      *
869      * @return A new {@link SegmentedJournal}.
870      */
871     public SegmentedJournal<E> build() {
872       return new SegmentedJournal<>(
873           name,
874           storageLevel,
875           directory,
876           namespace,
877           maxSegmentSize,
878           maxEntrySize,
879           maxEntriesPerSegment,
880           indexDensity,
881           flushOnCommit);
882     }
883   }
884 }