Split out JournalSegmentDescriptor.readFrom()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
27 import java.nio.file.StandardOpenOption;
28 import java.util.Collection;
29 import java.util.Map;
30 import java.util.TreeMap;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentNavigableMap;
33 import java.util.concurrent.ConcurrentSkipListMap;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Segmented journal.
39  */
40 public final class SegmentedJournal<E> implements Journal<E> {
41   /**
42    * Returns a new Raft log builder.
43    *
44    * @return A new Raft log builder.
45    */
46   public static <E> Builder<E> builder() {
47     return new Builder<>();
48   }
49
50   private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
51   private static final int SEGMENT_BUFFER_FACTOR = 3;
52
53   private final String name;
54   private final StorageLevel storageLevel;
55   private final File directory;
56   private final JournalSerializer<E> serializer;
57   private final int maxSegmentSize;
58   private final int maxEntrySize;
59   private final int maxEntriesPerSegment;
60   private final double indexDensity;
61   private final boolean flushOnCommit;
62   private final SegmentedJournalWriter<E> writer;
63   private volatile long commitIndex;
64
65   private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
66   private final Collection<SegmentedJournalReader> readers = ConcurrentHashMap.newKeySet();
67   private JournalSegment currentSegment;
68
69   private volatile boolean open = true;
70
71   public SegmentedJournal(
72       String name,
73       StorageLevel storageLevel,
74       File directory,
75       JournalSerdes namespace,
76       int maxSegmentSize,
77       int maxEntrySize,
78       int maxEntriesPerSegment,
79       double indexDensity,
80       boolean flushOnCommit) {
81     this.name = requireNonNull(name, "name cannot be null");
82     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
83     this.directory = requireNonNull(directory, "directory cannot be null");
84     this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
85     this.maxSegmentSize = maxSegmentSize;
86     this.maxEntrySize = maxEntrySize;
87     this.maxEntriesPerSegment = maxEntriesPerSegment;
88     this.indexDensity = indexDensity;
89     this.flushOnCommit = flushOnCommit;
90     open();
91     this.writer = new SegmentedJournalWriter<>(this);
92   }
93
94   /**
95    * Returns the segment file name prefix.
96    *
97    * @return The segment file name prefix.
98    */
99   public String name() {
100     return name;
101   }
102
103   /**
104    * Returns the storage directory.
105    * <p>
106    * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
107    * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
108    * when the log is opened.
109    *
110    * @return The storage directory.
111    */
112   public File directory() {
113     return directory;
114   }
115
116   /**
117    * Returns the storage level.
118    * <p>
119    * The storage level dictates how entries within individual journal segments should be stored.
120    *
121    * @return The storage level.
122    */
123   public StorageLevel storageLevel() {
124     return storageLevel;
125   }
126
127   /**
128    * Returns the maximum journal segment size.
129    * <p>
130    * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
131    *
132    * @return The maximum segment size in bytes.
133    */
134   public int maxSegmentSize() {
135     return maxSegmentSize;
136   }
137
138   /**
139    * Returns the maximum journal entry size.
140    * <p>
141    * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
142    *
143    * @return the maximum entry size in bytes
144    */
145   public int maxEntrySize() {
146     return maxEntrySize;
147   }
148
149   /**
150    * Returns the maximum number of entries per segment.
151    * <p>
152    * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
153    * in a journal.
154    *
155    * @return The maximum number of entries per segment.
156    * @deprecated since 3.0.2
157    */
158   @Deprecated
159   public int maxEntriesPerSegment() {
160     return maxEntriesPerSegment;
161   }
162
163   /**
164    * Returns the collection of journal segments.
165    *
166    * @return the collection of journal segments
167    */
168   public Collection<JournalSegment> segments() {
169     return segments.values();
170   }
171
172   /**
173    * Returns the collection of journal segments with indexes greater than the given index.
174    *
175    * @param index the starting index
176    * @return the journal segments starting with indexes greater than or equal to the given index
177    */
178   public Collection<JournalSegment> segments(long index) {
179     return segments.tailMap(index).values();
180   }
181
182   /**
183    * Returns serializer instance.
184    *
185    * @return serializer instance
186    */
187   JournalSerializer<E> serializer() {
188     return serializer;
189   }
190
191   /**
192    * Returns the total size of the journal.
193    *
194    * @return the total size of the journal
195    */
196   public long size() {
197     return segments.values().stream()
198         .mapToLong(segment -> segment.size())
199         .sum();
200   }
201
202   @Override
203   public JournalWriter<E> writer() {
204     return writer;
205   }
206
207   @Override
208   public JournalReader<E> openReader(long index) {
209     return openReader(index, JournalReader.Mode.ALL);
210   }
211
212   /**
213    * Opens a new Raft log reader with the given reader mode.
214    *
215    * @param index The index from which to begin reading entries.
216    * @param mode The mode in which to read entries.
217    * @return The Raft log reader.
218    */
219   public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
220     final var segment = getSegment(index);
221     final var reader = switch (mode) {
222       case ALL -> new SegmentedJournalReader<>(this, segment);
223       case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
224     };
225
226     // Forward reader to specified index
227     long next = reader.getNextIndex();
228     while (index > next && reader.tryAdvance()) {
229         next = reader.getNextIndex();
230     }
231
232     readers.add(reader);
233     return reader;
234   }
235
236   /**
237    * Opens the segments.
238    */
239   private synchronized void open() {
240     // Load existing log segments from disk.
241     for (JournalSegment segment : loadSegments()) {
242       segments.put(segment.descriptor().index(), segment);
243     }
244
245     // If a segment doesn't already exist, create an initial segment starting at index 1.
246     if (!segments.isEmpty()) {
247       currentSegment = segments.lastEntry().getValue();
248     } else {
249       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
250           .withId(1)
251           .withIndex(1)
252           .withMaxSegmentSize(maxSegmentSize)
253           .withMaxEntries(maxEntriesPerSegment)
254           .build();
255
256       currentSegment = createSegment(descriptor);
257       currentSegment.descriptor().update(System.currentTimeMillis());
258
259       segments.put(1L, currentSegment);
260     }
261   }
262
263   /**
264    * Asserts that the manager is open.
265    *
266    * @throws IllegalStateException if the segment manager is not open
267    */
268   private void assertOpen() {
269     checkState(currentSegment != null, "journal not open");
270   }
271
272   /**
273    * Asserts that enough disk space is available to allocate a new segment.
274    */
275   private void assertDiskSpace() {
276     if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
277       throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
278     }
279   }
280
281   /**
282    * Resets the current segment, creating a new segment if necessary.
283    */
284   private synchronized void resetCurrentSegment() {
285     JournalSegment lastSegment = getLastSegment();
286     if (lastSegment != null) {
287       currentSegment = lastSegment;
288     } else {
289       JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
290           .withId(1)
291           .withIndex(1)
292           .withMaxSegmentSize(maxSegmentSize)
293           .withMaxEntries(maxEntriesPerSegment)
294           .build();
295
296       currentSegment = createSegment(descriptor);
297
298       segments.put(1L, currentSegment);
299     }
300   }
301
302   /**
303    * Resets and returns the first segment in the journal.
304    *
305    * @param index the starting index of the journal
306    * @return the first segment
307    */
308   JournalSegment resetSegments(long index) {
309     assertOpen();
310
311     // If the index already equals the first segment index, skip the reset.
312     JournalSegment firstSegment = getFirstSegment();
313     if (index == firstSegment.firstIndex()) {
314       return firstSegment;
315     }
316
317     for (JournalSegment segment : segments.values()) {
318       segment.close();
319       segment.delete();
320     }
321     segments.clear();
322
323     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
324         .withId(1)
325         .withIndex(index)
326         .withMaxSegmentSize(maxSegmentSize)
327         .withMaxEntries(maxEntriesPerSegment)
328         .build();
329     currentSegment = createSegment(descriptor);
330     segments.put(index, currentSegment);
331     return currentSegment;
332   }
333
334   /**
335    * Returns the first segment in the log.
336    *
337    * @throws IllegalStateException if the segment manager is not open
338    */
339   JournalSegment getFirstSegment() {
340     assertOpen();
341     Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
342     return segment != null ? segment.getValue() : null;
343   }
344
345   /**
346    * Returns the last segment in the log.
347    *
348    * @throws IllegalStateException if the segment manager is not open
349    */
350   JournalSegment getLastSegment() {
351     assertOpen();
352     Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
353     return segment != null ? segment.getValue() : null;
354   }
355
356   /**
357    * Creates and returns the next segment.
358    *
359    * @return The next segment.
360    * @throws IllegalStateException if the segment manager is not open
361    */
362   synchronized JournalSegment getNextSegment() {
363     assertOpen();
364     assertDiskSpace();
365
366     JournalSegment lastSegment = getLastSegment();
367     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
368         .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
369         .withIndex(currentSegment.lastIndex() + 1)
370         .withMaxSegmentSize(maxSegmentSize)
371         .withMaxEntries(maxEntriesPerSegment)
372         .build();
373
374     currentSegment = createSegment(descriptor);
375
376     segments.put(descriptor.index(), currentSegment);
377     return currentSegment;
378   }
379
380   /**
381    * Returns the segment following the segment with the given ID.
382    *
383    * @param index The segment index with which to look up the next segment.
384    * @return The next segment for the given index.
385    */
386   JournalSegment getNextSegment(long index) {
387     Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
388     return nextSegment != null ? nextSegment.getValue() : null;
389   }
390
391   /**
392    * Returns the segment for the given index.
393    *
394    * @param index The index for which to return the segment.
395    * @throws IllegalStateException if the segment manager is not open
396    */
397   synchronized JournalSegment getSegment(long index) {
398     assertOpen();
399     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
400     if (currentSegment != null && index > currentSegment.firstIndex()) {
401       return currentSegment;
402     }
403
404     // If the index is in another segment, get the entry with the next lowest first index.
405     Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
406     if (segment != null) {
407       return segment.getValue();
408     }
409     return getFirstSegment();
410   }
411
412   /**
413    * Removes a segment.
414    *
415    * @param segment The segment to remove.
416    */
417   synchronized void removeSegment(JournalSegment segment) {
418     segments.remove(segment.firstIndex());
419     segment.close();
420     segment.delete();
421     resetCurrentSegment();
422   }
423
424   /**
425    * Creates a new segment.
426    */
427   JournalSegment createSegment(JournalSegmentDescriptor descriptor) {
428     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
429
430     RandomAccessFile raf;
431     FileChannel channel;
432     try {
433       raf = new RandomAccessFile(segmentFile, "rw");
434       raf.setLength(descriptor.maxSegmentSize());
435       channel =  raf.getChannel();
436     } catch (IOException e) {
437       throw new StorageException(e);
438     }
439
440     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
441     descriptor.copyTo(buffer);
442     buffer.flip();
443     try {
444       channel.write(buffer);
445     } catch (IOException e) {
446       throw new StorageException(e);
447     } finally {
448       try {
449         channel.close();
450         raf.close();
451       } catch (IOException e) {
452       }
453     }
454     JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
455     LOG.debug("Created segment: {}", segment);
456     return segment;
457   }
458
459   /**
460    * Creates a new segment instance.
461    *
462    * @param segmentFile The segment file.
463    * @param descriptor The segment descriptor.
464    * @return The segment instance.
465    */
466   protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
467     return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
468   }
469
470   /**
471    * Loads all segments from disk.
472    *
473    * @return A collection of segments for the log.
474    */
475   protected Collection<JournalSegment> loadSegments() {
476     // Ensure log directories are created.
477     directory.mkdirs();
478
479     final var segments = new TreeMap<Long, JournalSegment>();
480
481     // Iterate through all files in the log directory.
482     for (var file : directory.listFiles(File::isFile)) {
483
484       // If the file looks like a segment file, attempt to load the segment.
485       if (JournalSegmentFile.isSegmentFile(name, file)) {
486         // read the descriptor
487         final JournalSegmentDescriptor descriptor;
488         try {
489           descriptor = JournalSegmentDescriptor.readFrom(file.toPath());
490         } catch (IOException e) {
491           throw new StorageException(e);
492         }
493
494         // Load the segment.
495         final var segmentFile = new JournalSegmentFile(file);
496         final var segment = newSegment(segmentFile, descriptor);
497         LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName());
498
499         // Add the segment to the segments list.
500         segments.put(segment.firstIndex(), segment);
501       }
502     }
503
504     // Verify that all the segments in the log align with one another.
505     JournalSegment previousSegment = null;
506     boolean corrupted = false;
507     final var iterator = segments.entrySet().iterator();
508     while (iterator.hasNext()) {
509       final var segment = iterator.next().getValue();
510       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
511         LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(),
512             previousSegment.file().file());
513         corrupted = true;
514       }
515       if (corrupted) {
516         segment.close();
517         segment.delete();
518         iterator.remove();
519       }
520       previousSegment = segment;
521     }
522
523     return segments.values();
524   }
525
526   /**
527    * Resets journal readers to the given head.
528    *
529    * @param index The index at which to reset readers.
530    */
531   void resetHead(long index) {
532     for (SegmentedJournalReader<E> reader : readers) {
533       if (reader.getNextIndex() < index) {
534         reader.reset(index);
535       }
536     }
537   }
538
539   /**
540    * Resets journal readers to the given tail.
541    *
542    * @param index The index at which to reset readers.
543    */
544   void resetTail(long index) {
545     for (SegmentedJournalReader<E> reader : readers) {
546       if (reader.getNextIndex() >= index) {
547         reader.reset(index);
548       }
549     }
550   }
551
552   void closeReader(SegmentedJournalReader<E> reader) {
553     readers.remove(reader);
554   }
555
556   @Override
557   public boolean isOpen() {
558     return open;
559   }
560
561   /**
562    * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
563    *
564    * @param index the index from which to remove segments
565    * @return indicates whether a segment can be removed from the journal
566    */
567   public boolean isCompactable(long index) {
568     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
569     return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
570   }
571
572   /**
573    * Returns the index of the last segment in the log.
574    *
575    * @param index the compaction index
576    * @return the starting index of the last segment in the log
577    */
578   public long getCompactableIndex(long index) {
579     Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
580     return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
581   }
582
583   /**
584    * Compacts the journal up to the given index.
585    * <p>
586    * The semantics of compaction are not specified by this interface.
587    *
588    * @param index The index up to which to compact the journal.
589    */
590   public void compact(long index) {
591     final var segmentEntry = segments.floorEntry(index);
592     if (segmentEntry != null) {
593       final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
594       if (!compactSegments.isEmpty()) {
595         LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
596         for (JournalSegment segment : compactSegments.values()) {
597           LOG.trace("Deleting segment: {}", segment);
598           segment.close();
599           segment.delete();
600         }
601         compactSegments.clear();
602         resetHead(segmentEntry.getValue().firstIndex());
603       }
604     }
605   }
606
607   @Override
608   public void close() {
609     segments.values().forEach(segment -> {
610       LOG.debug("Closing segment: {}", segment);
611       segment.close();
612     });
613     currentSegment = null;
614     open = false;
615   }
616
617   /**
618    * Returns whether {@code flushOnCommit} is enabled for the log.
619    *
620    * @return Indicates whether {@code flushOnCommit} is enabled for the log.
621    */
622   boolean isFlushOnCommit() {
623     return flushOnCommit;
624   }
625
626   /**
627    * Commits entries up to the given index.
628    *
629    * @param index The index up to which to commit entries.
630    */
631   void setCommitIndex(long index) {
632     this.commitIndex = index;
633   }
634
635   /**
636    * Returns the Raft log commit index.
637    *
638    * @return The Raft log commit index.
639    */
640   long getCommitIndex() {
641     return commitIndex;
642   }
643
644   /**
645    * Raft log builder.
646    */
647   public static final class Builder<E> {
648     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
649     private static final String DEFAULT_NAME = "atomix";
650     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
651     private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
652     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
653     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
654     private static final double DEFAULT_INDEX_DENSITY = .005;
655
656     private String name = DEFAULT_NAME;
657     private StorageLevel storageLevel = StorageLevel.DISK;
658     private File directory = new File(DEFAULT_DIRECTORY);
659     private JournalSerdes namespace;
660     private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
661     private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
662     private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
663     private double indexDensity = DEFAULT_INDEX_DENSITY;
664     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
665
666     protected Builder() {
667     }
668
669     /**
670      * Sets the storage name.
671      *
672      * @param name The storage name.
673      * @return The storage builder.
674      */
675     public Builder<E> withName(String name) {
676       this.name = requireNonNull(name, "name cannot be null");
677       return this;
678     }
679
680     /**
681      * Sets the log storage level, returning the builder for method chaining.
682      * <p>
683      * The storage level indicates how individual entries should be persisted in the journal.
684      *
685      * @param storageLevel The log storage level.
686      * @return The storage builder.
687      */
688     public Builder<E> withStorageLevel(StorageLevel storageLevel) {
689       this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
690       return this;
691     }
692
693     /**
694      * Sets the log directory, returning the builder for method chaining.
695      * <p>
696      * The log will write segment files into the provided directory.
697      *
698      * @param directory The log directory.
699      * @return The storage builder.
700      * @throws NullPointerException If the {@code directory} is {@code null}
701      */
702     public Builder<E> withDirectory(String directory) {
703       return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
704     }
705
706     /**
707      * Sets the log directory, returning the builder for method chaining.
708      * <p>
709      * The log will write segment files into the provided directory.
710      *
711      * @param directory The log directory.
712      * @return The storage builder.
713      * @throws NullPointerException If the {@code directory} is {@code null}
714      */
715     public Builder<E> withDirectory(File directory) {
716       this.directory = requireNonNull(directory, "directory cannot be null");
717       return this;
718     }
719
720     /**
721      * Sets the journal namespace, returning the builder for method chaining.
722      *
723      * @param namespace The journal serializer.
724      * @return The journal builder.
725      */
726     public Builder<E> withNamespace(JournalSerdes namespace) {
727       this.namespace = requireNonNull(namespace, "namespace cannot be null");
728       return this;
729     }
730
731     /**
732      * Sets the maximum segment size in bytes, returning the builder for method chaining.
733      * <p>
734      * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
735      * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
736      * segment and append new entries to that segment.
737      * <p>
738      * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
739      *
740      * @param maxSegmentSize The maximum segment size in bytes.
741      * @return The storage builder.
742      * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
743      */
744     public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
745       checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
746       this.maxSegmentSize = maxSegmentSize;
747       return this;
748     }
749
750     /**
751      * Sets the maximum entry size in bytes, returning the builder for method chaining.
752      *
753      * @param maxEntrySize the maximum entry size in bytes
754      * @return the storage builder
755      * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
756      */
757     public Builder<E> withMaxEntrySize(int maxEntrySize) {
758       checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
759       this.maxEntrySize = maxEntrySize;
760       return this;
761     }
762
763     /**
764      * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
765      * <p>
766      * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
767      * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
768      * new segment and append new entries to that segment.
769      * <p>
770      * By default, the maximum entries per segment is {@code 1024 * 1024}.
771      *
772      * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
773      * @return The storage builder.
774      * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
775      *     per segment
776      * @deprecated since 3.0.2
777      */
778     @Deprecated
779     public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
780       checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
781       checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
782           "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
783       this.maxEntriesPerSegment = maxEntriesPerSegment;
784       return this;
785     }
786
787     /**
788      * Sets the journal index density.
789      * <p>
790      * The index density is the frequency at which the position of entries written to the journal will be recorded in an
791      * in-memory index for faster seeking.
792      *
793      * @param indexDensity the index density
794      * @return the journal builder
795      * @throws IllegalArgumentException if the density is not between 0 and 1
796      */
797     public Builder<E> withIndexDensity(double indexDensity) {
798       checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
799       this.indexDensity = indexDensity;
800       return this;
801     }
802
803     /**
804      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
805      * chaining.
806      * <p>
807      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
808      * committed in a given segment.
809      *
810      * @return The storage builder.
811      */
812     public Builder<E> withFlushOnCommit() {
813       return withFlushOnCommit(true);
814     }
815
816     /**
817      * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
818      * chaining.
819      * <p>
820      * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
821      * committed in a given segment.
822      *
823      * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
824      * @return The storage builder.
825      */
826     public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
827       this.flushOnCommit = flushOnCommit;
828       return this;
829     }
830
831     /**
832      * Build the {@link SegmentedJournal}.
833      *
834      * @return A new {@link SegmentedJournal}.
835      */
836     public SegmentedJournal<E> build() {
837       return new SegmentedJournal<>(
838           name,
839           storageLevel,
840           directory,
841           namespace,
842           maxSegmentSize,
843           maxEntrySize,
844           maxEntriesPerSegment,
845           indexDensity,
846           flushOnCommit);
847     }
848   }
849 }