2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.FileChannel;
27 import java.nio.file.StandardOpenOption;
28 import java.util.Collection;
30 import java.util.TreeMap;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentNavigableMap;
33 import java.util.concurrent.ConcurrentSkipListMap;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
40 public final class SegmentedJournal<E> implements Journal<E> {
42 * Returns a new Raft log builder.
44 * @return A new Raft log builder.
46 public static <E> Builder<E> builder() {
47 return new Builder<>();
50 private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
51 private static final int SEGMENT_BUFFER_FACTOR = 3;
53 private final String name;
54 private final StorageLevel storageLevel;
55 private final File directory;
56 private final JournalSerializer<E> serializer;
57 private final int maxSegmentSize;
58 private final int maxEntrySize;
59 private final int maxEntriesPerSegment;
60 private final double indexDensity;
61 private final boolean flushOnCommit;
62 private final SegmentedJournalWriter<E> writer;
63 private volatile long commitIndex;
65 private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
66 private final Collection<SegmentedJournalReader> readers = ConcurrentHashMap.newKeySet();
67 private JournalSegment currentSegment;
69 private volatile boolean open = true;
71 public SegmentedJournal(
73 StorageLevel storageLevel,
75 JournalSerdes namespace,
78 int maxEntriesPerSegment,
80 boolean flushOnCommit) {
81 this.name = requireNonNull(name, "name cannot be null");
82 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
83 this.directory = requireNonNull(directory, "directory cannot be null");
84 this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
85 this.maxSegmentSize = maxSegmentSize;
86 this.maxEntrySize = maxEntrySize;
87 this.maxEntriesPerSegment = maxEntriesPerSegment;
88 this.indexDensity = indexDensity;
89 this.flushOnCommit = flushOnCommit;
91 this.writer = new SegmentedJournalWriter<>(this);
95 * Returns the segment file name prefix.
97 * @return The segment file name prefix.
99 public String name() {
104 * Returns the storage directory.
106 * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
107 * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
108 * when the log is opened.
110 * @return The storage directory.
112 public File directory() {
117 * Returns the storage level.
119 * The storage level dictates how entries within individual journal segments should be stored.
121 * @return The storage level.
123 public StorageLevel storageLevel() {
128 * Returns the maximum journal segment size.
130 * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
132 * @return The maximum segment size in bytes.
134 public int maxSegmentSize() {
135 return maxSegmentSize;
139 * Returns the maximum journal entry size.
141 * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
143 * @return the maximum entry size in bytes
145 public int maxEntrySize() {
150 * Returns the maximum number of entries per segment.
152 * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
155 * @return The maximum number of entries per segment.
156 * @deprecated since 3.0.2
159 public int maxEntriesPerSegment() {
160 return maxEntriesPerSegment;
164 * Returns the collection of journal segments.
166 * @return the collection of journal segments
168 public Collection<JournalSegment> segments() {
169 return segments.values();
173 * Returns the collection of journal segments with indexes greater than the given index.
175 * @param index the starting index
176 * @return the journal segments starting with indexes greater than or equal to the given index
178 public Collection<JournalSegment> segments(long index) {
179 return segments.tailMap(index).values();
183 * Returns serializer instance.
185 * @return serializer instance
187 JournalSerializer<E> serializer() {
192 * Returns the total size of the journal.
194 * @return the total size of the journal
197 return segments.values().stream()
198 .mapToLong(segment -> segment.size())
203 public JournalWriter<E> writer() {
208 public JournalReader<E> openReader(long index) {
209 return openReader(index, JournalReader.Mode.ALL);
213 * Opens a new Raft log reader with the given reader mode.
215 * @param index The index from which to begin reading entries.
216 * @param mode The mode in which to read entries.
217 * @return The Raft log reader.
219 public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
220 final var segment = getSegment(index);
221 final var reader = switch (mode) {
222 case ALL -> new SegmentedJournalReader<>(this, segment);
223 case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
226 // Forward reader to specified index
227 long next = reader.getNextIndex();
228 while (index > next && reader.tryAdvance()) {
229 next = reader.getNextIndex();
237 * Opens the segments.
239 private synchronized void open() {
240 // Load existing log segments from disk.
241 for (JournalSegment segment : loadSegments()) {
242 segments.put(segment.descriptor().index(), segment);
245 // If a segment doesn't already exist, create an initial segment starting at index 1.
246 if (!segments.isEmpty()) {
247 currentSegment = segments.lastEntry().getValue();
249 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
252 .withMaxSegmentSize(maxSegmentSize)
253 .withMaxEntries(maxEntriesPerSegment)
256 currentSegment = createSegment(descriptor);
257 currentSegment.descriptor().update(System.currentTimeMillis());
259 segments.put(1L, currentSegment);
264 * Asserts that the manager is open.
266 * @throws IllegalStateException if the segment manager is not open
268 private void assertOpen() {
269 checkState(currentSegment != null, "journal not open");
273 * Asserts that enough disk space is available to allocate a new segment.
275 private void assertDiskSpace() {
276 if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
277 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
282 * Resets the current segment, creating a new segment if necessary.
284 private synchronized void resetCurrentSegment() {
285 JournalSegment lastSegment = getLastSegment();
286 if (lastSegment != null) {
287 currentSegment = lastSegment;
289 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
292 .withMaxSegmentSize(maxSegmentSize)
293 .withMaxEntries(maxEntriesPerSegment)
296 currentSegment = createSegment(descriptor);
298 segments.put(1L, currentSegment);
303 * Resets and returns the first segment in the journal.
305 * @param index the starting index of the journal
306 * @return the first segment
308 JournalSegment resetSegments(long index) {
311 // If the index already equals the first segment index, skip the reset.
312 JournalSegment firstSegment = getFirstSegment();
313 if (index == firstSegment.firstIndex()) {
317 for (JournalSegment segment : segments.values()) {
323 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
326 .withMaxSegmentSize(maxSegmentSize)
327 .withMaxEntries(maxEntriesPerSegment)
329 currentSegment = createSegment(descriptor);
330 segments.put(index, currentSegment);
331 return currentSegment;
335 * Returns the first segment in the log.
337 * @throws IllegalStateException if the segment manager is not open
339 JournalSegment getFirstSegment() {
341 Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
342 return segment != null ? segment.getValue() : null;
346 * Returns the last segment in the log.
348 * @throws IllegalStateException if the segment manager is not open
350 JournalSegment getLastSegment() {
352 Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
353 return segment != null ? segment.getValue() : null;
357 * Creates and returns the next segment.
359 * @return The next segment.
360 * @throws IllegalStateException if the segment manager is not open
362 synchronized JournalSegment getNextSegment() {
366 JournalSegment lastSegment = getLastSegment();
367 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
368 .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
369 .withIndex(currentSegment.lastIndex() + 1)
370 .withMaxSegmentSize(maxSegmentSize)
371 .withMaxEntries(maxEntriesPerSegment)
374 currentSegment = createSegment(descriptor);
376 segments.put(descriptor.index(), currentSegment);
377 return currentSegment;
381 * Returns the segment following the segment with the given ID.
383 * @param index The segment index with which to look up the next segment.
384 * @return The next segment for the given index.
386 JournalSegment getNextSegment(long index) {
387 Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
388 return nextSegment != null ? nextSegment.getValue() : null;
392 * Returns the segment for the given index.
394 * @param index The index for which to return the segment.
395 * @throws IllegalStateException if the segment manager is not open
397 synchronized JournalSegment getSegment(long index) {
399 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
400 if (currentSegment != null && index > currentSegment.firstIndex()) {
401 return currentSegment;
404 // If the index is in another segment, get the entry with the next lowest first index.
405 Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
406 if (segment != null) {
407 return segment.getValue();
409 return getFirstSegment();
415 * @param segment The segment to remove.
417 synchronized void removeSegment(JournalSegment segment) {
418 segments.remove(segment.firstIndex());
421 resetCurrentSegment();
425 * Creates a new segment.
427 JournalSegment createSegment(JournalSegmentDescriptor descriptor) {
428 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
430 RandomAccessFile raf;
433 raf = new RandomAccessFile(segmentFile, "rw");
434 raf.setLength(descriptor.maxSegmentSize());
435 channel = raf.getChannel();
436 } catch (IOException e) {
437 throw new StorageException(e);
440 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
441 descriptor.copyTo(buffer);
444 channel.write(buffer);
445 } catch (IOException e) {
446 throw new StorageException(e);
451 } catch (IOException e) {
454 JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
455 LOG.debug("Created segment: {}", segment);
460 * Creates a new segment instance.
462 * @param segmentFile The segment file.
463 * @param descriptor The segment descriptor.
464 * @return The segment instance.
466 protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
467 return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
471 * Loads all segments from disk.
473 * @return A collection of segments for the log.
475 protected Collection<JournalSegment> loadSegments() {
476 // Ensure log directories are created.
479 final var segments = new TreeMap<Long, JournalSegment>();
481 // Iterate through all files in the log directory.
482 for (var file : directory.listFiles(File::isFile)) {
484 // If the file looks like a segment file, attempt to load the segment.
485 if (JournalSegmentFile.isSegmentFile(name, file)) {
486 // read the descriptor
487 final JournalSegmentDescriptor descriptor;
489 descriptor = JournalSegmentDescriptor.readFrom(file.toPath());
490 } catch (IOException e) {
491 throw new StorageException(e);
495 final var segmentFile = new JournalSegmentFile(file);
496 final var segment = newSegment(segmentFile, descriptor);
497 LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName());
499 // Add the segment to the segments list.
500 segments.put(segment.firstIndex(), segment);
504 // Verify that all the segments in the log align with one another.
505 JournalSegment previousSegment = null;
506 boolean corrupted = false;
507 final var iterator = segments.entrySet().iterator();
508 while (iterator.hasNext()) {
509 final var segment = iterator.next().getValue();
510 if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
511 LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(),
512 previousSegment.file().file());
520 previousSegment = segment;
523 return segments.values();
527 * Resets journal readers to the given head.
529 * @param index The index at which to reset readers.
531 void resetHead(long index) {
532 for (SegmentedJournalReader<E> reader : readers) {
533 if (reader.getNextIndex() < index) {
540 * Resets journal readers to the given tail.
542 * @param index The index at which to reset readers.
544 void resetTail(long index) {
545 for (SegmentedJournalReader<E> reader : readers) {
546 if (reader.getNextIndex() >= index) {
552 void closeReader(SegmentedJournalReader<E> reader) {
553 readers.remove(reader);
557 public boolean isOpen() {
562 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
564 * @param index the index from which to remove segments
565 * @return indicates whether a segment can be removed from the journal
567 public boolean isCompactable(long index) {
568 Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
569 return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
573 * Returns the index of the last segment in the log.
575 * @param index the compaction index
576 * @return the starting index of the last segment in the log
578 public long getCompactableIndex(long index) {
579 Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
580 return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
584 * Compacts the journal up to the given index.
586 * The semantics of compaction are not specified by this interface.
588 * @param index The index up to which to compact the journal.
590 public void compact(long index) {
591 final var segmentEntry = segments.floorEntry(index);
592 if (segmentEntry != null) {
593 final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
594 if (!compactSegments.isEmpty()) {
595 LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
596 for (JournalSegment segment : compactSegments.values()) {
597 LOG.trace("Deleting segment: {}", segment);
601 compactSegments.clear();
602 resetHead(segmentEntry.getValue().firstIndex());
608 public void close() {
609 segments.values().forEach(segment -> {
610 LOG.debug("Closing segment: {}", segment);
613 currentSegment = null;
618 * Returns whether {@code flushOnCommit} is enabled for the log.
620 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
622 boolean isFlushOnCommit() {
623 return flushOnCommit;
627 * Commits entries up to the given index.
629 * @param index The index up to which to commit entries.
631 void setCommitIndex(long index) {
632 this.commitIndex = index;
636 * Returns the Raft log commit index.
638 * @return The Raft log commit index.
640 long getCommitIndex() {
647 public static final class Builder<E> {
648 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
649 private static final String DEFAULT_NAME = "atomix";
650 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
651 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
652 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
653 private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
654 private static final double DEFAULT_INDEX_DENSITY = .005;
656 private String name = DEFAULT_NAME;
657 private StorageLevel storageLevel = StorageLevel.DISK;
658 private File directory = new File(DEFAULT_DIRECTORY);
659 private JournalSerdes namespace;
660 private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
661 private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
662 private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
663 private double indexDensity = DEFAULT_INDEX_DENSITY;
664 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
666 protected Builder() {
670 * Sets the storage name.
672 * @param name The storage name.
673 * @return The storage builder.
675 public Builder<E> withName(String name) {
676 this.name = requireNonNull(name, "name cannot be null");
681 * Sets the log storage level, returning the builder for method chaining.
683 * The storage level indicates how individual entries should be persisted in the journal.
685 * @param storageLevel The log storage level.
686 * @return The storage builder.
688 public Builder<E> withStorageLevel(StorageLevel storageLevel) {
689 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
694 * Sets the log directory, returning the builder for method chaining.
696 * The log will write segment files into the provided directory.
698 * @param directory The log directory.
699 * @return The storage builder.
700 * @throws NullPointerException If the {@code directory} is {@code null}
702 public Builder<E> withDirectory(String directory) {
703 return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
707 * Sets the log directory, returning the builder for method chaining.
709 * The log will write segment files into the provided directory.
711 * @param directory The log directory.
712 * @return The storage builder.
713 * @throws NullPointerException If the {@code directory} is {@code null}
715 public Builder<E> withDirectory(File directory) {
716 this.directory = requireNonNull(directory, "directory cannot be null");
721 * Sets the journal namespace, returning the builder for method chaining.
723 * @param namespace The journal serializer.
724 * @return The journal builder.
726 public Builder<E> withNamespace(JournalSerdes namespace) {
727 this.namespace = requireNonNull(namespace, "namespace cannot be null");
732 * Sets the maximum segment size in bytes, returning the builder for method chaining.
734 * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
735 * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
736 * segment and append new entries to that segment.
738 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
740 * @param maxSegmentSize The maximum segment size in bytes.
741 * @return The storage builder.
742 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
744 public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
745 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
746 this.maxSegmentSize = maxSegmentSize;
751 * Sets the maximum entry size in bytes, returning the builder for method chaining.
753 * @param maxEntrySize the maximum entry size in bytes
754 * @return the storage builder
755 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
757 public Builder<E> withMaxEntrySize(int maxEntrySize) {
758 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
759 this.maxEntrySize = maxEntrySize;
764 * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
766 * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
767 * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
768 * new segment and append new entries to that segment.
770 * By default, the maximum entries per segment is {@code 1024 * 1024}.
772 * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
773 * @return The storage builder.
774 * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
776 * @deprecated since 3.0.2
779 public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
780 checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
781 checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
782 "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
783 this.maxEntriesPerSegment = maxEntriesPerSegment;
788 * Sets the journal index density.
790 * The index density is the frequency at which the position of entries written to the journal will be recorded in an
791 * in-memory index for faster seeking.
793 * @param indexDensity the index density
794 * @return the journal builder
795 * @throws IllegalArgumentException if the density is not between 0 and 1
797 public Builder<E> withIndexDensity(double indexDensity) {
798 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
799 this.indexDensity = indexDensity;
804 * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
807 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
808 * committed in a given segment.
810 * @return The storage builder.
812 public Builder<E> withFlushOnCommit() {
813 return withFlushOnCommit(true);
817 * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
820 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
821 * committed in a given segment.
823 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
824 * @return The storage builder.
826 public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
827 this.flushOnCommit = flushOnCommit;
832 * Build the {@link SegmentedJournal}.
834 * @return A new {@link SegmentedJournal}.
836 public SegmentedJournal<E> build() {
837 return new SegmentedJournal<>(
844 maxEntriesPerSegment,