2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.util.Collection;
27 import java.util.TreeMap;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentNavigableMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
37 public final class SegmentedJournal<E> implements Journal<E> {
39 * Returns a new Raft log builder.
41 * @return A new Raft log builder.
43 public static <E> Builder<E> builder() {
44 return new Builder<>();
47 private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
48 private static final int SEGMENT_BUFFER_FACTOR = 3;
50 private final String name;
51 private final StorageLevel storageLevel;
52 private final File directory;
53 private final JournalSerializer<E> serializer;
54 private final int maxSegmentSize;
55 private final int maxEntrySize;
56 private final int maxEntriesPerSegment;
57 private final double indexDensity;
58 private final boolean flushOnCommit;
59 private final SegmentedJournalWriter<E> writer;
60 private volatile long commitIndex;
62 private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
63 private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
64 private JournalSegment currentSegment;
66 private volatile boolean open = true;
68 public SegmentedJournal(
70 StorageLevel storageLevel,
72 JournalSerdes namespace,
75 int maxEntriesPerSegment,
77 boolean flushOnCommit) {
78 this.name = requireNonNull(name, "name cannot be null");
79 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
80 this.directory = requireNonNull(directory, "directory cannot be null");
81 this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
82 this.maxSegmentSize = maxSegmentSize;
83 this.maxEntrySize = maxEntrySize;
84 this.maxEntriesPerSegment = maxEntriesPerSegment;
85 this.indexDensity = indexDensity;
86 this.flushOnCommit = flushOnCommit;
88 this.writer = new SegmentedJournalWriter<>(this);
92 * Returns the segment file name prefix.
94 * @return The segment file name prefix.
96 public String name() {
101 * Returns the storage directory.
103 * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
104 * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
105 * when the log is opened.
107 * @return The storage directory.
109 public File directory() {
114 * Returns the storage level.
116 * The storage level dictates how entries within individual journal segments should be stored.
118 * @return The storage level.
120 public StorageLevel storageLevel() {
125 * Returns the maximum journal segment size.
127 * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
129 * @return The maximum segment size in bytes.
131 public int maxSegmentSize() {
132 return maxSegmentSize;
136 * Returns the maximum journal entry size.
138 * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
140 * @return the maximum entry size in bytes
142 public int maxEntrySize() {
147 * Returns the maximum number of entries per segment.
149 * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
152 * @return The maximum number of entries per segment.
153 * @deprecated since 3.0.2
156 public int maxEntriesPerSegment() {
157 return maxEntriesPerSegment;
161 * Returns the collection of journal segments.
163 * @return the collection of journal segments
165 public Collection<JournalSegment> segments() {
166 return segments.values();
170 * Returns the collection of journal segments with indexes greater than the given index.
172 * @param index the starting index
173 * @return the journal segments starting with indexes greater than or equal to the given index
175 public Collection<JournalSegment> segments(long index) {
176 return segments.tailMap(index).values();
180 * Returns serializer instance.
182 * @return serializer instance
184 JournalSerializer<E> serializer() {
189 * Returns the total size of the journal.
191 * @return the total size of the journal
194 return segments.values().stream()
195 .mapToLong(JournalSegment::size)
200 public JournalWriter<E> writer() {
205 public JournalReader<E> openReader(long index) {
206 return openReader(index, JournalReader.Mode.ALL);
210 * Opens a new Raft log reader with the given reader mode.
212 * @param index The index from which to begin reading entries.
213 * @param mode The mode in which to read entries.
214 * @return The Raft log reader.
217 public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
218 final var segment = getSegment(index);
219 final var reader = switch (mode) {
220 case ALL -> new SegmentedJournalReader<>(this, segment);
221 case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
224 // Forward reader to specified index
225 long next = reader.getNextIndex();
226 while (index > next && reader.tryAdvance()) {
227 next = reader.getNextIndex();
235 * Opens the segments.
237 private synchronized void open() {
238 // Load existing log segments from disk.
239 for (JournalSegment segment : loadSegments()) {
240 segments.put(segment.descriptor().index(), segment);
243 // If a segment doesn't already exist, create an initial segment starting at index 1.
244 if (!segments.isEmpty()) {
245 currentSegment = segments.lastEntry().getValue();
247 currentSegment = createSegment(1, 1);
248 segments.put(1L, currentSegment);
253 * Asserts that the manager is open.
255 * @throws IllegalStateException if the segment manager is not open
257 private void assertOpen() {
258 checkState(currentSegment != null, "journal not open");
262 * Asserts that enough disk space is available to allocate a new segment.
264 private void assertDiskSpace() {
265 if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
266 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
271 * Resets the current segment, creating a new segment if necessary.
273 private synchronized void resetCurrentSegment() {
274 final var lastSegment = getLastSegment();
275 if (lastSegment == null) {
276 currentSegment = createSegment(1, 1);
277 segments.put(1L, currentSegment);
279 currentSegment = lastSegment;
284 * Resets and returns the first segment in the journal.
286 * @param index the starting index of the journal
287 * @return the first segment
289 JournalSegment resetSegments(long index) {
292 // If the index already equals the first segment index, skip the reset.
293 JournalSegment firstSegment = getFirstSegment();
294 if (index == firstSegment.firstIndex()) {
298 for (JournalSegment segment : segments.values()) {
304 currentSegment = createSegment(1, index);
305 segments.put(index, currentSegment);
306 return currentSegment;
310 * Returns the first segment in the log.
312 * @throws IllegalStateException if the segment manager is not open
314 JournalSegment getFirstSegment() {
316 Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
317 return segment != null ? segment.getValue() : null;
321 * Returns the last segment in the log.
323 * @throws IllegalStateException if the segment manager is not open
325 JournalSegment getLastSegment() {
327 Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
328 return segment != null ? segment.getValue() : null;
332 * Creates and returns the next segment.
334 * @return The next segment.
335 * @throws IllegalStateException if the segment manager is not open
337 synchronized JournalSegment getNextSegment() {
341 final var index = currentSegment.lastIndex() + 1;
342 final var lastSegment = getLastSegment();
343 currentSegment = createSegment(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1, index);
344 segments.put(index, currentSegment);
345 return currentSegment;
349 * Returns the segment following the segment with the given ID.
351 * @param index The segment index with which to look up the next segment.
352 * @return The next segment for the given index.
354 JournalSegment getNextSegment(long index) {
355 Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
356 return nextSegment != null ? nextSegment.getValue() : null;
360 * Returns the segment for the given index.
362 * @param index The index for which to return the segment.
363 * @throws IllegalStateException if the segment manager is not open
365 synchronized JournalSegment getSegment(long index) {
367 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
368 if (currentSegment != null && index > currentSegment.firstIndex()) {
369 return currentSegment;
372 // If the index is in another segment, get the entry with the next lowest first index.
373 Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
374 if (segment != null) {
375 return segment.getValue();
377 return getFirstSegment();
383 * @param segment The segment to remove.
385 synchronized void removeSegment(JournalSegment segment) {
386 segments.remove(segment.firstIndex());
389 resetCurrentSegment();
393 * Creates a new segment.
395 JournalSegment createSegment(long id, long index) {
396 final var segmentFile = JournalSegmentFile.createSegmentFile(name, directory, id);
397 final var descriptor = JournalSegmentDescriptor.builder()
400 .withMaxSegmentSize(maxSegmentSize)
401 .withMaxEntries(maxEntriesPerSegment)
402 .withUpdated(System.currentTimeMillis())
405 try (var raf = new RandomAccessFile(segmentFile, "rw")) {
406 raf.setLength(maxSegmentSize);
407 raf.write(descriptor.toArray());
408 } catch (IOException e) {
409 throw new StorageException(e);
412 final var segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
413 LOG.debug("Created segment: {}", segment);
418 * Creates a new segment instance.
420 * @param segmentFile The segment file.
421 * @param descriptor The segment descriptor.
422 * @return The segment instance.
424 protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
425 return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
429 * Loads all segments from disk.
431 * @return A collection of segments for the log.
433 protected Collection<JournalSegment> loadSegments() {
434 // Ensure log directories are created.
437 final var segments = new TreeMap<Long, JournalSegment>();
439 // Iterate through all files in the log directory.
440 for (var file : directory.listFiles(File::isFile)) {
442 // If the file looks like a segment file, attempt to load the segment.
443 if (JournalSegmentFile.isSegmentFile(name, file)) {
444 // read the descriptor
445 final JournalSegmentDescriptor descriptor;
447 descriptor = JournalSegmentDescriptor.readFrom(file.toPath());
448 } catch (IOException e) {
449 throw new StorageException(e);
453 final var segmentFile = new JournalSegmentFile(file);
454 final var segment = newSegment(segmentFile, descriptor);
455 LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName());
457 // Add the segment to the segments list.
458 segments.put(segment.firstIndex(), segment);
462 // Verify that all the segments in the log align with one another.
463 JournalSegment previousSegment = null;
464 boolean corrupted = false;
465 final var iterator = segments.entrySet().iterator();
466 while (iterator.hasNext()) {
467 final var segment = iterator.next().getValue();
468 if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
469 LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(),
470 previousSegment.file().file());
478 previousSegment = segment;
481 return segments.values();
485 * Resets journal readers to the given head.
487 * @param index The index at which to reset readers.
489 void resetHead(long index) {
490 for (var reader : readers) {
491 if (reader.getNextIndex() < index) {
498 * Resets journal readers to the given tail.
500 * @param index The index at which to reset readers.
502 void resetTail(long index) {
503 for (var reader : readers) {
504 if (reader.getNextIndex() >= index) {
510 void closeReader(SegmentedJournalReader<E> reader) {
511 readers.remove(reader);
515 public boolean isOpen() {
520 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
522 * @param index the index from which to remove segments
523 * @return indicates whether a segment can be removed from the journal
525 public boolean isCompactable(long index) {
526 Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
527 return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
531 * Returns the index of the last segment in the log.
533 * @param index the compaction index
534 * @return the starting index of the last segment in the log
536 public long getCompactableIndex(long index) {
537 Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
538 return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
542 * Compacts the journal up to the given index.
544 * The semantics of compaction are not specified by this interface.
546 * @param index The index up to which to compact the journal.
548 public void compact(long index) {
549 final var segmentEntry = segments.floorEntry(index);
550 if (segmentEntry != null) {
551 final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
552 if (!compactSegments.isEmpty()) {
553 LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
554 for (JournalSegment segment : compactSegments.values()) {
555 LOG.trace("Deleting segment: {}", segment);
559 compactSegments.clear();
560 resetHead(segmentEntry.getValue().firstIndex());
566 public void close() {
567 segments.values().forEach(segment -> {
568 LOG.debug("Closing segment: {}", segment);
571 currentSegment = null;
576 * Returns whether {@code flushOnCommit} is enabled for the log.
578 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
580 boolean isFlushOnCommit() {
581 return flushOnCommit;
585 * Commits entries up to the given index.
587 * @param index The index up to which to commit entries.
589 void setCommitIndex(long index) {
590 this.commitIndex = index;
594 * Returns the Raft log commit index.
596 * @return The Raft log commit index.
598 long getCommitIndex() {
605 public static final class Builder<E> {
606 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
607 private static final String DEFAULT_NAME = "atomix";
608 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
609 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
610 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
611 private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
612 private static final double DEFAULT_INDEX_DENSITY = .005;
614 private String name = DEFAULT_NAME;
615 private StorageLevel storageLevel = StorageLevel.DISK;
616 private File directory = new File(DEFAULT_DIRECTORY);
617 private JournalSerdes namespace;
618 private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
619 private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
620 private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
621 private double indexDensity = DEFAULT_INDEX_DENSITY;
622 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
629 * Sets the storage name.
631 * @param name The storage name.
632 * @return The storage builder.
634 public Builder<E> withName(String name) {
635 this.name = requireNonNull(name, "name cannot be null");
640 * Sets the log storage level, returning the builder for method chaining.
642 * The storage level indicates how individual entries should be persisted in the journal.
644 * @param storageLevel The log storage level.
645 * @return The storage builder.
647 public Builder<E> withStorageLevel(StorageLevel storageLevel) {
648 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
653 * Sets the log directory, returning the builder for method chaining.
655 * The log will write segment files into the provided directory.
657 * @param directory The log directory.
658 * @return The storage builder.
659 * @throws NullPointerException If the {@code directory} is {@code null}
661 public Builder<E> withDirectory(String directory) {
662 return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
666 * Sets the log directory, returning the builder for method chaining.
668 * The log will write segment files into the provided directory.
670 * @param directory The log directory.
671 * @return The storage builder.
672 * @throws NullPointerException If the {@code directory} is {@code null}
674 public Builder<E> withDirectory(File directory) {
675 this.directory = requireNonNull(directory, "directory cannot be null");
680 * Sets the journal namespace, returning the builder for method chaining.
682 * @param namespace The journal serializer.
683 * @return The journal builder.
685 public Builder<E> withNamespace(JournalSerdes namespace) {
686 this.namespace = requireNonNull(namespace, "namespace cannot be null");
691 * Sets the maximum segment size in bytes, returning the builder for method chaining.
693 * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
694 * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
695 * segment and append new entries to that segment.
697 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
699 * @param maxSegmentSize The maximum segment size in bytes.
700 * @return The storage builder.
701 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
703 public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
704 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
705 "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
706 this.maxSegmentSize = maxSegmentSize;
711 * Sets the maximum entry size in bytes, returning the builder for method chaining.
713 * @param maxEntrySize the maximum entry size in bytes
714 * @return the storage builder
715 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
717 public Builder<E> withMaxEntrySize(int maxEntrySize) {
718 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
719 this.maxEntrySize = maxEntrySize;
724 * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
726 * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
727 * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
728 * new segment and append new entries to that segment.
730 * By default, the maximum entries per segment is {@code 1024 * 1024}.
732 * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
733 * @return The storage builder.
734 * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
736 * @deprecated since 3.0.2
739 public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
740 checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
741 checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
742 "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
743 this.maxEntriesPerSegment = maxEntriesPerSegment;
748 * Sets the journal index density.
750 * The index density is the frequency at which the position of entries written to the journal will be recorded in an
751 * in-memory index for faster seeking.
753 * @param indexDensity the index density
754 * @return the journal builder
755 * @throws IllegalArgumentException if the density is not between 0 and 1
757 public Builder<E> withIndexDensity(double indexDensity) {
758 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
759 this.indexDensity = indexDensity;
764 * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
767 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
768 * committed in a given segment.
770 * @return The storage builder.
772 public Builder<E> withFlushOnCommit() {
773 return withFlushOnCommit(true);
777 * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
780 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
781 * committed in a given segment.
783 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
784 * @return The storage builder.
786 public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
787 this.flushOnCommit = flushOnCommit;
792 * Build the {@link SegmentedJournal}.
794 * @return A new {@link SegmentedJournal}.
796 public SegmentedJournal<E> build() {
797 return new SegmentedJournal<>(
804 maxEntriesPerSegment,