2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
27 import java.nio.file.StandardOpenOption;
28 import java.util.Collection;
29 import java.util.Iterator;
31 import java.util.TreeMap;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentNavigableMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
41 public final class SegmentedJournal<E> implements Journal<E> {
43 * Returns a new Raft log builder.
45 * @return A new Raft log builder.
47 public static <E> Builder<E> builder() {
48 return new Builder<>();
51 private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
52 private static final int SEGMENT_BUFFER_FACTOR = 3;
54 private final String name;
55 private final StorageLevel storageLevel;
56 private final File directory;
57 private final JournalSerializer<E> serializer;
58 private final int maxSegmentSize;
59 private final int maxEntrySize;
60 private final int maxEntriesPerSegment;
61 private final double indexDensity;
62 private final boolean flushOnCommit;
63 private final SegmentedJournalWriter<E> writer;
64 private volatile long commitIndex;
66 private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
67 private final Collection<SegmentedJournalReader> readers = ConcurrentHashMap.newKeySet();
68 private JournalSegment currentSegment;
70 private volatile boolean open = true;
72 public SegmentedJournal(
74 StorageLevel storageLevel,
76 JournalSerdes namespace,
79 int maxEntriesPerSegment,
81 boolean flushOnCommit) {
82 this.name = requireNonNull(name, "name cannot be null");
83 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
84 this.directory = requireNonNull(directory, "directory cannot be null");
85 this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
86 this.maxSegmentSize = maxSegmentSize;
87 this.maxEntrySize = maxEntrySize;
88 this.maxEntriesPerSegment = maxEntriesPerSegment;
89 this.indexDensity = indexDensity;
90 this.flushOnCommit = flushOnCommit;
92 this.writer = new SegmentedJournalWriter<>(this);
96 * Returns the segment file name prefix.
98 * @return The segment file name prefix.
100 public String name() {
105 * Returns the storage directory.
107 * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
108 * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
109 * when the log is opened.
111 * @return The storage directory.
113 public File directory() {
118 * Returns the storage level.
120 * The storage level dictates how entries within individual journal segments should be stored.
122 * @return The storage level.
124 public StorageLevel storageLevel() {
129 * Returns the maximum journal segment size.
131 * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
133 * @return The maximum segment size in bytes.
135 public int maxSegmentSize() {
136 return maxSegmentSize;
140 * Returns the maximum journal entry size.
142 * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
144 * @return the maximum entry size in bytes
146 public int maxEntrySize() {
151 * Returns the maximum number of entries per segment.
153 * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
156 * @return The maximum number of entries per segment.
157 * @deprecated since 3.0.2
160 public int maxEntriesPerSegment() {
161 return maxEntriesPerSegment;
165 * Returns the collection of journal segments.
167 * @return the collection of journal segments
169 public Collection<JournalSegment> segments() {
170 return segments.values();
174 * Returns the collection of journal segments with indexes greater than the given index.
176 * @param index the starting index
177 * @return the journal segments starting with indexes greater than or equal to the given index
179 public Collection<JournalSegment> segments(long index) {
180 return segments.tailMap(index).values();
184 * Returns serializer instance.
186 * @return serializer instance
188 JournalSerializer<E> serializer() {
193 * Returns the total size of the journal.
195 * @return the total size of the journal
198 return segments.values().stream()
199 .mapToLong(segment -> segment.size())
204 public JournalWriter<E> writer() {
209 public JournalReader<E> openReader(long index) {
210 return openReader(index, JournalReader.Mode.ALL);
214 * Opens a new Raft log reader with the given reader mode.
216 * @param index The index from which to begin reading entries.
217 * @param mode The mode in which to read entries.
218 * @return The Raft log reader.
220 public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
221 final var segment = getSegment(index);
222 final var reader = switch (mode) {
223 case ALL -> new SegmentedJournalReader<>(this, segment);
224 case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
227 // Forward reader to specified index
228 long next = reader.getNextIndex();
229 while (index > next && reader.tryAdvance()) {
230 next = reader.getNextIndex();
238 * Opens the segments.
240 private synchronized void open() {
241 // Load existing log segments from disk.
242 for (JournalSegment segment : loadSegments()) {
243 segments.put(segment.descriptor().index(), segment);
246 // If a segment doesn't already exist, create an initial segment starting at index 1.
247 if (!segments.isEmpty()) {
248 currentSegment = segments.lastEntry().getValue();
250 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
253 .withMaxSegmentSize(maxSegmentSize)
254 .withMaxEntries(maxEntriesPerSegment)
257 currentSegment = createSegment(descriptor);
258 currentSegment.descriptor().update(System.currentTimeMillis());
260 segments.put(1L, currentSegment);
265 * Asserts that the manager is open.
267 * @throws IllegalStateException if the segment manager is not open
269 private void assertOpen() {
270 checkState(currentSegment != null, "journal not open");
274 * Asserts that enough disk space is available to allocate a new segment.
276 private void assertDiskSpace() {
277 if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
278 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
283 * Resets the current segment, creating a new segment if necessary.
285 private synchronized void resetCurrentSegment() {
286 JournalSegment lastSegment = getLastSegment();
287 if (lastSegment != null) {
288 currentSegment = lastSegment;
290 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
293 .withMaxSegmentSize(maxSegmentSize)
294 .withMaxEntries(maxEntriesPerSegment)
297 currentSegment = createSegment(descriptor);
299 segments.put(1L, currentSegment);
304 * Resets and returns the first segment in the journal.
306 * @param index the starting index of the journal
307 * @return the first segment
309 JournalSegment resetSegments(long index) {
312 // If the index already equals the first segment index, skip the reset.
313 JournalSegment firstSegment = getFirstSegment();
314 if (index == firstSegment.firstIndex()) {
318 for (JournalSegment segment : segments.values()) {
324 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
327 .withMaxSegmentSize(maxSegmentSize)
328 .withMaxEntries(maxEntriesPerSegment)
330 currentSegment = createSegment(descriptor);
331 segments.put(index, currentSegment);
332 return currentSegment;
336 * Returns the first segment in the log.
338 * @throws IllegalStateException if the segment manager is not open
340 JournalSegment getFirstSegment() {
342 Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
343 return segment != null ? segment.getValue() : null;
347 * Returns the last segment in the log.
349 * @throws IllegalStateException if the segment manager is not open
351 JournalSegment getLastSegment() {
353 Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
354 return segment != null ? segment.getValue() : null;
358 * Creates and returns the next segment.
360 * @return The next segment.
361 * @throws IllegalStateException if the segment manager is not open
363 synchronized JournalSegment getNextSegment() {
367 JournalSegment lastSegment = getLastSegment();
368 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
369 .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
370 .withIndex(currentSegment.lastIndex() + 1)
371 .withMaxSegmentSize(maxSegmentSize)
372 .withMaxEntries(maxEntriesPerSegment)
375 currentSegment = createSegment(descriptor);
377 segments.put(descriptor.index(), currentSegment);
378 return currentSegment;
382 * Returns the segment following the segment with the given ID.
384 * @param index The segment index with which to look up the next segment.
385 * @return The next segment for the given index.
387 JournalSegment getNextSegment(long index) {
388 Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
389 return nextSegment != null ? nextSegment.getValue() : null;
393 * Returns the segment for the given index.
395 * @param index The index for which to return the segment.
396 * @throws IllegalStateException if the segment manager is not open
398 synchronized JournalSegment getSegment(long index) {
400 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
401 if (currentSegment != null && index > currentSegment.firstIndex()) {
402 return currentSegment;
405 // If the index is in another segment, get the entry with the next lowest first index.
406 Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
407 if (segment != null) {
408 return segment.getValue();
410 return getFirstSegment();
416 * @param segment The segment to remove.
418 synchronized void removeSegment(JournalSegment segment) {
419 segments.remove(segment.firstIndex());
422 resetCurrentSegment();
426 * Creates a new segment.
428 JournalSegment createSegment(JournalSegmentDescriptor descriptor) {
429 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
431 RandomAccessFile raf;
434 raf = new RandomAccessFile(segmentFile, "rw");
435 raf.setLength(descriptor.maxSegmentSize());
436 channel = raf.getChannel();
437 } catch (IOException e) {
438 throw new StorageException(e);
441 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
442 descriptor.copyTo(buffer);
445 channel.write(buffer);
446 } catch (IOException e) {
447 throw new StorageException(e);
452 } catch (IOException e) {
455 JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
456 LOG.debug("Created segment: {}", segment);
461 * Creates a new segment instance.
463 * @param segmentFile The segment file.
464 * @param descriptor The segment descriptor.
465 * @return The segment instance.
467 protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
468 return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
474 private JournalSegment loadSegment(long segmentId) {
475 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
476 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
477 try (FileChannel channel = openChannel(segmentFile)) {
478 channel.read(buffer);
480 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
481 JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
482 LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
484 } catch (IOException e) {
485 throw new StorageException(e);
489 private FileChannel openChannel(File file) {
491 return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
492 } catch (IOException e) {
493 throw new StorageException(e);
498 * Loads all segments from disk.
500 * @return A collection of segments for the log.
502 protected Collection<JournalSegment> loadSegments() {
503 // Ensure log directories are created.
506 TreeMap<Long, JournalSegment> segments = new TreeMap<>();
508 // Iterate through all files in the log directory.
509 for (File file : directory.listFiles(File::isFile)) {
511 // If the file looks like a segment file, attempt to load the segment.
512 if (JournalSegmentFile.isSegmentFile(name, file)) {
513 JournalSegmentFile segmentFile = new JournalSegmentFile(file);
514 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
515 try (FileChannel channel = openChannel(file)) {
516 channel.read(buffer);
518 } catch (IOException e) {
519 throw new StorageException(e);
522 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
525 JournalSegment segment = loadSegment(descriptor.id());
527 // Add the segment to the segments list.
528 LOG.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
529 segments.put(segment.firstIndex(), segment);
533 // Verify that all the segments in the log align with one another.
534 JournalSegment previousSegment = null;
535 boolean corrupted = false;
536 Iterator<Map.Entry<Long, JournalSegment>> iterator = segments.entrySet().iterator();
537 while (iterator.hasNext()) {
538 JournalSegment segment = iterator.next().getValue();
539 if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
540 LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
548 previousSegment = segment;
551 return segments.values();
555 * Resets journal readers to the given head.
557 * @param index The index at which to reset readers.
559 void resetHead(long index) {
560 for (SegmentedJournalReader<E> reader : readers) {
561 if (reader.getNextIndex() < index) {
568 * Resets journal readers to the given tail.
570 * @param index The index at which to reset readers.
572 void resetTail(long index) {
573 for (SegmentedJournalReader<E> reader : readers) {
574 if (reader.getNextIndex() >= index) {
580 void closeReader(SegmentedJournalReader<E> reader) {
581 readers.remove(reader);
585 public boolean isOpen() {
590 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
592 * @param index the index from which to remove segments
593 * @return indicates whether a segment can be removed from the journal
595 public boolean isCompactable(long index) {
596 Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
597 return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
601 * Returns the index of the last segment in the log.
603 * @param index the compaction index
604 * @return the starting index of the last segment in the log
606 public long getCompactableIndex(long index) {
607 Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
608 return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
612 * Compacts the journal up to the given index.
614 * The semantics of compaction are not specified by this interface.
616 * @param index The index up to which to compact the journal.
618 public void compact(long index) {
619 final var segmentEntry = segments.floorEntry(index);
620 if (segmentEntry != null) {
621 final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
622 if (!compactSegments.isEmpty()) {
623 LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
624 for (JournalSegment segment : compactSegments.values()) {
625 LOG.trace("Deleting segment: {}", segment);
629 compactSegments.clear();
630 resetHead(segmentEntry.getValue().firstIndex());
636 public void close() {
637 segments.values().forEach(segment -> {
638 LOG.debug("Closing segment: {}", segment);
641 currentSegment = null;
646 * Returns whether {@code flushOnCommit} is enabled for the log.
648 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
650 boolean isFlushOnCommit() {
651 return flushOnCommit;
655 * Commits entries up to the given index.
657 * @param index The index up to which to commit entries.
659 void setCommitIndex(long index) {
660 this.commitIndex = index;
664 * Returns the Raft log commit index.
666 * @return The Raft log commit index.
668 long getCommitIndex() {
675 public static final class Builder<E> {
676 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
677 private static final String DEFAULT_NAME = "atomix";
678 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
679 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
680 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
681 private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
682 private static final double DEFAULT_INDEX_DENSITY = .005;
684 private String name = DEFAULT_NAME;
685 private StorageLevel storageLevel = StorageLevel.DISK;
686 private File directory = new File(DEFAULT_DIRECTORY);
687 private JournalSerdes namespace;
688 private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
689 private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
690 private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
691 private double indexDensity = DEFAULT_INDEX_DENSITY;
692 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
694 protected Builder() {
698 * Sets the storage name.
700 * @param name The storage name.
701 * @return The storage builder.
703 public Builder<E> withName(String name) {
704 this.name = requireNonNull(name, "name cannot be null");
709 * Sets the log storage level, returning the builder for method chaining.
711 * The storage level indicates how individual entries should be persisted in the journal.
713 * @param storageLevel The log storage level.
714 * @return The storage builder.
716 public Builder<E> withStorageLevel(StorageLevel storageLevel) {
717 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
722 * Sets the log directory, returning the builder for method chaining.
724 * The log will write segment files into the provided directory.
726 * @param directory The log directory.
727 * @return The storage builder.
728 * @throws NullPointerException If the {@code directory} is {@code null}
730 public Builder<E> withDirectory(String directory) {
731 return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
735 * Sets the log directory, returning the builder for method chaining.
737 * The log will write segment files into the provided directory.
739 * @param directory The log directory.
740 * @return The storage builder.
741 * @throws NullPointerException If the {@code directory} is {@code null}
743 public Builder<E> withDirectory(File directory) {
744 this.directory = requireNonNull(directory, "directory cannot be null");
749 * Sets the journal namespace, returning the builder for method chaining.
751 * @param namespace The journal serializer.
752 * @return The journal builder.
754 public Builder<E> withNamespace(JournalSerdes namespace) {
755 this.namespace = requireNonNull(namespace, "namespace cannot be null");
760 * Sets the maximum segment size in bytes, returning the builder for method chaining.
762 * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
763 * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
764 * segment and append new entries to that segment.
766 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
768 * @param maxSegmentSize The maximum segment size in bytes.
769 * @return The storage builder.
770 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
772 public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
773 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
774 this.maxSegmentSize = maxSegmentSize;
779 * Sets the maximum entry size in bytes, returning the builder for method chaining.
781 * @param maxEntrySize the maximum entry size in bytes
782 * @return the storage builder
783 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
785 public Builder<E> withMaxEntrySize(int maxEntrySize) {
786 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
787 this.maxEntrySize = maxEntrySize;
792 * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
794 * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
795 * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
796 * new segment and append new entries to that segment.
798 * By default, the maximum entries per segment is {@code 1024 * 1024}.
800 * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
801 * @return The storage builder.
802 * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
804 * @deprecated since 3.0.2
807 public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
808 checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
809 checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
810 "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
811 this.maxEntriesPerSegment = maxEntriesPerSegment;
816 * Sets the journal index density.
818 * The index density is the frequency at which the position of entries written to the journal will be recorded in an
819 * in-memory index for faster seeking.
821 * @param indexDensity the index density
822 * @return the journal builder
823 * @throws IllegalArgumentException if the density is not between 0 and 1
825 public Builder<E> withIndexDensity(double indexDensity) {
826 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
827 this.indexDensity = indexDensity;
832 * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
835 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
836 * committed in a given segment.
838 * @return The storage builder.
840 public Builder<E> withFlushOnCommit() {
841 return withFlushOnCommit(true);
845 * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
848 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
849 * committed in a given segment.
851 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
852 * @return The storage builder.
854 public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
855 this.flushOnCommit = flushOnCommit;
860 * Build the {@link SegmentedJournal}.
862 * @return A new {@link SegmentedJournal}.
864 public SegmentedJournal<E> build() {
865 return new SegmentedJournal<>(
872 maxEntriesPerSegment,