2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
27 import java.util.SortedMap;
28 import java.util.TreeMap;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentNavigableMap;
31 import java.util.concurrent.ConcurrentSkipListMap;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import static com.google.common.base.Preconditions.checkArgument;
36 import static com.google.common.base.Preconditions.checkState;
37 import static java.util.Objects.requireNonNull;
42 public final class SegmentedJournal<E> implements Journal<E> {
44 * Returns a new Raft log builder.
46 * @return A new Raft log builder.
48 public static <E> Builder<E> builder() {
49 return new Builder<>();
52 private static final int SEGMENT_BUFFER_FACTOR = 3;
54 private final Logger log = LoggerFactory.getLogger(getClass());
55 private final String name;
56 private final StorageLevel storageLevel;
57 private final File directory;
58 private final JournalSerdes namespace;
59 private final int maxSegmentSize;
60 private final int maxEntrySize;
61 private final int maxEntriesPerSegment;
62 private final double indexDensity;
63 private final boolean flushOnCommit;
64 private final SegmentedJournalWriter<E> writer;
65 private volatile long commitIndex;
67 private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
68 private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
69 private JournalSegment<E> currentSegment;
71 private volatile boolean open = true;
73 public SegmentedJournal(
75 StorageLevel storageLevel,
77 JournalSerdes namespace,
80 int maxEntriesPerSegment,
82 boolean flushOnCommit) {
83 this.name = requireNonNull(name, "name cannot be null");
84 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
85 this.directory = requireNonNull(directory, "directory cannot be null");
86 this.namespace = requireNonNull(namespace, "namespace cannot be null");
87 this.maxSegmentSize = maxSegmentSize;
88 this.maxEntrySize = maxEntrySize;
89 this.maxEntriesPerSegment = maxEntriesPerSegment;
90 this.indexDensity = indexDensity;
91 this.flushOnCommit = flushOnCommit;
93 this.writer = new SegmentedJournalWriter<>(this);
97 * Returns the segment file name prefix.
99 * @return The segment file name prefix.
101 public String name() {
106 * Returns the storage directory.
108 * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
109 * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
110 * when the log is opened.
112 * @return The storage directory.
114 public File directory() {
119 * Returns the storage level.
121 * The storage level dictates how entries within individual journal segments should be stored.
123 * @return The storage level.
125 public StorageLevel storageLevel() {
130 * Returns the maximum journal segment size.
132 * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
134 * @return The maximum segment size in bytes.
136 public int maxSegmentSize() {
137 return maxSegmentSize;
141 * Returns the maximum journal entry size.
143 * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
145 * @return the maximum entry size in bytes
147 public int maxEntrySize() {
152 * Returns the maximum number of entries per segment.
154 * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
157 * @return The maximum number of entries per segment.
158 * @deprecated since 3.0.2
161 public int maxEntriesPerSegment() {
162 return maxEntriesPerSegment;
166 * Returns the collection of journal segments.
168 * @return the collection of journal segments
170 public Collection<JournalSegment<E>> segments() {
171 return segments.values();
175 * Returns the collection of journal segments with indexes greater than the given index.
177 * @param index the starting index
178 * @return the journal segments starting with indexes greater than or equal to the given index
180 public Collection<JournalSegment<E>> segments(long index) {
181 return segments.tailMap(index).values();
185 * Returns the total size of the journal.
187 * @return the total size of the journal
190 return segments.values().stream()
191 .mapToLong(segment -> segment.size())
196 public JournalWriter<E> writer() {
201 public JournalReader<E> openReader(long index) {
202 return openReader(index, JournalReader.Mode.ALL);
206 * Opens a new Raft log reader with the given reader mode.
208 * @param index The index from which to begin reading entries.
209 * @param mode The mode in which to read entries.
210 * @return The Raft log reader.
212 public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
213 final var segment = getSegment(index);
214 final var reader = switch (mode) {
215 case ALL -> new SegmentedJournalReader<>(this, segment);
216 case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
219 // Forward reader to specified index
220 long next = reader.getNextIndex();
221 while (index > next && reader.tryNext() != null) {
222 next = reader.getNextIndex();
230 * Opens the segments.
232 private synchronized void open() {
233 // Load existing log segments from disk.
234 for (JournalSegment<E> segment : loadSegments()) {
235 segments.put(segment.descriptor().index(), segment);
238 // If a segment doesn't already exist, create an initial segment starting at index 1.
239 if (!segments.isEmpty()) {
240 currentSegment = segments.lastEntry().getValue();
242 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
245 .withMaxSegmentSize(maxSegmentSize)
246 .withMaxEntries(maxEntriesPerSegment)
249 currentSegment = createSegment(descriptor);
250 currentSegment.descriptor().update(System.currentTimeMillis());
252 segments.put(1L, currentSegment);
257 * Asserts that the manager is open.
259 * @throws IllegalStateException if the segment manager is not open
261 private void assertOpen() {
262 checkState(currentSegment != null, "journal not open");
266 * Asserts that enough disk space is available to allocate a new segment.
268 private void assertDiskSpace() {
269 if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
270 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
275 * Resets the current segment, creating a new segment if necessary.
277 private synchronized void resetCurrentSegment() {
278 JournalSegment<E> lastSegment = getLastSegment();
279 if (lastSegment != null) {
280 currentSegment = lastSegment;
282 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
285 .withMaxSegmentSize(maxSegmentSize)
286 .withMaxEntries(maxEntriesPerSegment)
289 currentSegment = createSegment(descriptor);
291 segments.put(1L, currentSegment);
296 * Resets and returns the first segment in the journal.
298 * @param index the starting index of the journal
299 * @return the first segment
301 JournalSegment<E> resetSegments(long index) {
304 // If the index already equals the first segment index, skip the reset.
305 JournalSegment<E> firstSegment = getFirstSegment();
306 if (index == firstSegment.index()) {
310 for (JournalSegment<E> segment : segments.values()) {
316 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
319 .withMaxSegmentSize(maxSegmentSize)
320 .withMaxEntries(maxEntriesPerSegment)
322 currentSegment = createSegment(descriptor);
323 segments.put(index, currentSegment);
324 return currentSegment;
328 * Returns the first segment in the log.
330 * @throws IllegalStateException if the segment manager is not open
332 JournalSegment<E> getFirstSegment() {
334 Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
335 return segment != null ? segment.getValue() : null;
339 * Returns the last segment in the log.
341 * @throws IllegalStateException if the segment manager is not open
343 JournalSegment<E> getLastSegment() {
345 Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
346 return segment != null ? segment.getValue() : null;
350 * Creates and returns the next segment.
352 * @return The next segment.
353 * @throws IllegalStateException if the segment manager is not open
355 synchronized JournalSegment<E> getNextSegment() {
359 JournalSegment<E> lastSegment = getLastSegment();
360 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
361 .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
362 .withIndex(currentSegment.lastIndex() + 1)
363 .withMaxSegmentSize(maxSegmentSize)
364 .withMaxEntries(maxEntriesPerSegment)
367 currentSegment = createSegment(descriptor);
369 segments.put(descriptor.index(), currentSegment);
370 return currentSegment;
374 * Returns the segment following the segment with the given ID.
376 * @param index The segment index with which to look up the next segment.
377 * @return The next segment for the given index.
379 JournalSegment<E> getNextSegment(long index) {
380 Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
381 return nextSegment != null ? nextSegment.getValue() : null;
385 * Returns the segment for the given index.
387 * @param index The index for which to return the segment.
388 * @throws IllegalStateException if the segment manager is not open
390 synchronized JournalSegment<E> getSegment(long index) {
392 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
393 if (currentSegment != null && index > currentSegment.index()) {
394 return currentSegment;
397 // If the index is in another segment, get the entry with the next lowest first index.
398 Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
399 if (segment != null) {
400 return segment.getValue();
402 return getFirstSegment();
408 * @param segment The segment to remove.
410 synchronized void removeSegment(JournalSegment<E> segment) {
411 segments.remove(segment.index());
414 resetCurrentSegment();
418 * Creates a new segment.
420 JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
421 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
423 RandomAccessFile raf;
426 raf = new RandomAccessFile(segmentFile, "rw");
427 raf.setLength(descriptor.maxSegmentSize());
428 channel = raf.getChannel();
429 } catch (IOException e) {
430 throw new StorageException(e);
433 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
434 descriptor.copyTo(buffer);
437 channel.write(buffer);
438 } catch (IOException e) {
439 throw new StorageException(e);
444 } catch (IOException e) {
447 JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
448 log.debug("Created segment: {}", segment);
453 * Creates a new segment instance.
455 * @param segmentFile The segment file.
456 * @param descriptor The segment descriptor.
457 * @return The segment instance.
459 protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
460 return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
466 private JournalSegment<E> loadSegment(long segmentId) {
467 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
468 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
469 try (FileChannel channel = openChannel(segmentFile)) {
470 channel.read(buffer);
472 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
473 JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
474 log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
476 } catch (IOException e) {
477 throw new StorageException(e);
481 private FileChannel openChannel(File file) {
483 return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
484 } catch (IOException e) {
485 throw new StorageException(e);
490 * Loads all segments from disk.
492 * @return A collection of segments for the log.
494 protected Collection<JournalSegment<E>> loadSegments() {
495 // Ensure log directories are created.
498 TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
500 // Iterate through all files in the log directory.
501 for (File file : directory.listFiles(File::isFile)) {
503 // If the file looks like a segment file, attempt to load the segment.
504 if (JournalSegmentFile.isSegmentFile(name, file)) {
505 JournalSegmentFile segmentFile = new JournalSegmentFile(file);
506 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
507 try (FileChannel channel = openChannel(file)) {
508 channel.read(buffer);
510 } catch (IOException e) {
511 throw new StorageException(e);
514 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
517 JournalSegment<E> segment = loadSegment(descriptor.id());
519 // Add the segment to the segments list.
520 log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
521 segments.put(segment.index(), segment);
525 // Verify that all the segments in the log align with one another.
526 JournalSegment<E> previousSegment = null;
527 boolean corrupted = false;
528 Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
529 while (iterator.hasNext()) {
530 JournalSegment<E> segment = iterator.next().getValue();
531 if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
532 log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
540 previousSegment = segment;
543 return segments.values();
547 * Resets journal readers to the given head.
549 * @param index The index at which to reset readers.
551 void resetHead(long index) {
552 for (SegmentedJournalReader<E> reader : readers) {
553 if (reader.getNextIndex() < index) {
560 * Resets journal readers to the given tail.
562 * @param index The index at which to reset readers.
564 void resetTail(long index) {
565 for (SegmentedJournalReader<E> reader : readers) {
566 if (reader.getNextIndex() >= index) {
572 void closeReader(SegmentedJournalReader<E> reader) {
573 readers.remove(reader);
577 public boolean isOpen() {
582 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
584 * @param index the index from which to remove segments
585 * @return indicates whether a segment can be removed from the journal
587 public boolean isCompactable(long index) {
588 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
589 return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
593 * Returns the index of the last segment in the log.
595 * @param index the compaction index
596 * @return the starting index of the last segment in the log
598 public long getCompactableIndex(long index) {
599 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
600 return segmentEntry != null ? segmentEntry.getValue().index() : 0;
604 * Compacts the journal up to the given index.
606 * The semantics of compaction are not specified by this interface.
608 * @param index The index up to which to compact the journal.
610 public void compact(long index) {
611 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
612 if (segmentEntry != null) {
613 SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
614 if (!compactSegments.isEmpty()) {
615 log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
616 for (JournalSegment<E> segment : compactSegments.values()) {
617 log.trace("Deleting segment: {}", segment);
621 compactSegments.clear();
622 resetHead(segmentEntry.getValue().index());
628 public void close() {
629 segments.values().forEach(segment -> {
630 log.debug("Closing segment: {}", segment);
633 currentSegment = null;
638 * Returns whether {@code flushOnCommit} is enabled for the log.
640 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
642 boolean isFlushOnCommit() {
643 return flushOnCommit;
647 * Commits entries up to the given index.
649 * @param index The index up to which to commit entries.
651 void setCommitIndex(long index) {
652 this.commitIndex = index;
656 * Returns the Raft log commit index.
658 * @return The Raft log commit index.
660 long getCommitIndex() {
667 public static final class Builder<E> {
668 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
669 private static final String DEFAULT_NAME = "atomix";
670 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
671 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
672 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
673 private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
674 private static final double DEFAULT_INDEX_DENSITY = .005;
676 private String name = DEFAULT_NAME;
677 private StorageLevel storageLevel = StorageLevel.DISK;
678 private File directory = new File(DEFAULT_DIRECTORY);
679 private JournalSerdes namespace;
680 private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
681 private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
682 private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
683 private double indexDensity = DEFAULT_INDEX_DENSITY;
684 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
686 protected Builder() {
690 * Sets the storage name.
692 * @param name The storage name.
693 * @return The storage builder.
695 public Builder<E> withName(String name) {
696 this.name = requireNonNull(name, "name cannot be null");
701 * Sets the log storage level, returning the builder for method chaining.
703 * The storage level indicates how individual entries should be persisted in the journal.
705 * @param storageLevel The log storage level.
706 * @return The storage builder.
708 public Builder<E> withStorageLevel(StorageLevel storageLevel) {
709 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
714 * Sets the log directory, returning the builder for method chaining.
716 * The log will write segment files into the provided directory.
718 * @param directory The log directory.
719 * @return The storage builder.
720 * @throws NullPointerException If the {@code directory} is {@code null}
722 public Builder<E> withDirectory(String directory) {
723 return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
727 * Sets the log directory, returning the builder for method chaining.
729 * The log will write segment files into the provided directory.
731 * @param directory The log directory.
732 * @return The storage builder.
733 * @throws NullPointerException If the {@code directory} is {@code null}
735 public Builder<E> withDirectory(File directory) {
736 this.directory = requireNonNull(directory, "directory cannot be null");
741 * Sets the journal namespace, returning the builder for method chaining.
743 * @param namespace The journal serializer.
744 * @return The journal builder.
746 public Builder<E> withNamespace(JournalSerdes namespace) {
747 this.namespace = requireNonNull(namespace, "namespace cannot be null");
752 * Sets the maximum segment size in bytes, returning the builder for method chaining.
754 * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
755 * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
756 * segment and append new entries to that segment.
758 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
760 * @param maxSegmentSize The maximum segment size in bytes.
761 * @return The storage builder.
762 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
764 public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
765 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
766 this.maxSegmentSize = maxSegmentSize;
771 * Sets the maximum entry size in bytes, returning the builder for method chaining.
773 * @param maxEntrySize the maximum entry size in bytes
774 * @return the storage builder
775 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
777 public Builder<E> withMaxEntrySize(int maxEntrySize) {
778 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
779 this.maxEntrySize = maxEntrySize;
784 * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
786 * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
787 * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
788 * new segment and append new entries to that segment.
790 * By default, the maximum entries per segment is {@code 1024 * 1024}.
792 * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
793 * @return The storage builder.
794 * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
796 * @deprecated since 3.0.2
799 public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
800 checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
801 checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
802 "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
803 this.maxEntriesPerSegment = maxEntriesPerSegment;
808 * Sets the journal index density.
810 * The index density is the frequency at which the position of entries written to the journal will be recorded in an
811 * in-memory index for faster seeking.
813 * @param indexDensity the index density
814 * @return the journal builder
815 * @throws IllegalArgumentException if the density is not between 0 and 1
817 public Builder<E> withIndexDensity(double indexDensity) {
818 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
819 this.indexDensity = indexDensity;
824 * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
827 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
828 * committed in a given segment.
830 * @return The storage builder.
832 public Builder<E> withFlushOnCommit() {
833 return withFlushOnCommit(true);
837 * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
840 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
841 * committed in a given segment.
843 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
844 * @return The storage builder.
846 public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
847 this.flushOnCommit = flushOnCommit;
852 * Build the {@link SegmentedJournal}.
854 * @return A new {@link SegmentedJournal}.
856 public SegmentedJournal<E> build() {
857 return new SegmentedJournal<>(
864 maxEntriesPerSegment,