2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
23 import java.io.IOException;
24 import java.util.Collection;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentNavigableMap;
29 import java.util.concurrent.ConcurrentSkipListMap;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
36 public final class SegmentedJournal<E> implements Journal<E> {
38 * Returns a new Raft log builder.
40 * @return A new Raft log builder.
42 public static <E> Builder<E> builder() {
43 return new Builder<>();
46 private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
47 private static final int SEGMENT_BUFFER_FACTOR = 3;
49 private final String name;
50 private final StorageLevel storageLevel;
51 private final File directory;
52 private final JournalSerializer<E> serializer;
53 private final int maxSegmentSize;
54 private final int maxEntrySize;
55 private final int maxEntriesPerSegment;
56 private final double indexDensity;
57 private final boolean flushOnCommit;
58 private final SegmentedJournalWriter<E> writer;
59 private volatile long commitIndex;
61 private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
62 private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
63 private JournalSegment currentSegment;
65 private volatile boolean open = true;
67 public SegmentedJournal(
69 StorageLevel storageLevel,
71 JournalSerdes namespace,
74 int maxEntriesPerSegment,
76 boolean flushOnCommit) {
77 this.name = requireNonNull(name, "name cannot be null");
78 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
79 this.directory = requireNonNull(directory, "directory cannot be null");
80 this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
81 this.maxSegmentSize = maxSegmentSize;
82 this.maxEntrySize = maxEntrySize;
83 this.maxEntriesPerSegment = maxEntriesPerSegment;
84 this.indexDensity = indexDensity;
85 this.flushOnCommit = flushOnCommit;
87 this.writer = new SegmentedJournalWriter<>(this);
91 * Returns the segment file name prefix.
93 * @return The segment file name prefix.
95 public String name() {
100 * Returns the storage directory.
102 * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
103 * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
104 * when the log is opened.
106 * @return The storage directory.
108 public File directory() {
113 * Returns the storage level.
115 * The storage level dictates how entries within individual journal segments should be stored.
117 * @return The storage level.
119 public StorageLevel storageLevel() {
124 * Returns the maximum journal segment size.
126 * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
128 * @return The maximum segment size in bytes.
130 public int maxSegmentSize() {
131 return maxSegmentSize;
135 * Returns the maximum journal entry size.
137 * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
139 * @return the maximum entry size in bytes
141 public int maxEntrySize() {
146 * Returns the maximum number of entries per segment.
148 * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
151 * @return The maximum number of entries per segment.
152 * @deprecated since 3.0.2
155 public int maxEntriesPerSegment() {
156 return maxEntriesPerSegment;
160 * Returns the collection of journal segments.
162 * @return the collection of journal segments
164 public Collection<JournalSegment> segments() {
165 return segments.values();
169 * Returns the collection of journal segments with indexes greater than the given index.
171 * @param index the starting index
172 * @return the journal segments starting with indexes greater than or equal to the given index
174 public Collection<JournalSegment> segments(long index) {
175 return segments.tailMap(index).values();
179 * Returns serializer instance.
181 * @return serializer instance
183 JournalSerializer<E> serializer() {
188 * Returns the total size of the journal.
190 * @return the total size of the journal
193 return segments.values().stream()
194 .mapToLong(segment -> {
196 return segment.file().size();
197 } catch (IOException e) {
198 throw new StorageException(e);
205 public JournalWriter<E> writer() {
210 public JournalReader<E> openReader(long index) {
211 return openReader(index, JournalReader.Mode.ALL);
215 * Opens a new Raft log reader with the given reader mode.
217 * @param index The index from which to begin reading entries.
218 * @param mode The mode in which to read entries.
219 * @return The Raft log reader.
222 public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
223 final var segment = getSegment(index);
224 final var reader = switch (mode) {
225 case ALL -> new SegmentedJournalReader<>(this, segment);
226 case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
229 // Forward reader to specified index
230 long next = reader.getNextIndex();
231 while (index > next && reader.tryAdvance()) {
232 next = reader.getNextIndex();
240 * Opens the segments.
242 private synchronized void open() {
243 // Load existing log segments from disk.
244 for (var segment : loadSegments()) {
245 segments.put(segment.firstIndex(), segment);
248 // If a segment doesn't already exist, create an initial segment starting at index 1.
249 if (!segments.isEmpty()) {
250 currentSegment = segments.lastEntry().getValue();
252 currentSegment = createSegment(1, 1);
253 segments.put(1L, currentSegment);
258 * Asserts that the manager is open.
260 * @throws IllegalStateException if the segment manager is not open
262 private void assertOpen() {
263 checkState(currentSegment != null, "journal not open");
267 * Asserts that enough disk space is available to allocate a new segment.
269 private void assertDiskSpace() {
270 if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
271 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
276 * Resets the current segment, creating a new segment if necessary.
278 private synchronized void resetCurrentSegment() {
279 final var lastSegment = getLastSegment();
280 if (lastSegment == null) {
281 currentSegment = createSegment(1, 1);
282 segments.put(1L, currentSegment);
284 currentSegment = lastSegment;
289 * Resets and returns the first segment in the journal.
291 * @param index the starting index of the journal
292 * @return the first segment
294 JournalSegment resetSegments(long index) {
297 // If the index already equals the first segment index, skip the reset.
298 final var firstSegment = getFirstSegment();
299 if (index == firstSegment.firstIndex()) {
303 segments.values().forEach(JournalSegment::delete);
306 currentSegment = createSegment(1, index);
307 segments.put(index, currentSegment);
308 return currentSegment;
312 * Returns the first segment in the log.
314 * @throws IllegalStateException if the segment manager is not open
316 JournalSegment getFirstSegment() {
318 Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
319 return segment != null ? segment.getValue() : null;
323 * Returns the last segment in the log.
325 * @throws IllegalStateException if the segment manager is not open
327 JournalSegment getLastSegment() {
329 Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
330 return segment != null ? segment.getValue() : null;
334 * Creates and returns the next segment.
336 * @return The next segment.
337 * @throws IllegalStateException if the segment manager is not open
339 synchronized JournalSegment getNextSegment() {
343 final var index = currentSegment.lastIndex() + 1;
344 final var lastSegment = getLastSegment();
345 currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index);
346 segments.put(index, currentSegment);
347 return currentSegment;
351 * Returns the segment following the segment with the given ID.
353 * @param index The segment index with which to look up the next segment.
354 * @return The next segment for the given index.
356 JournalSegment getNextSegment(long index) {
357 Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
358 return nextSegment != null ? nextSegment.getValue() : null;
362 * Returns the segment for the given index.
364 * @param index The index for which to return the segment.
365 * @throws IllegalStateException if the segment manager is not open
367 synchronized JournalSegment getSegment(long index) {
369 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
370 if (currentSegment != null && index > currentSegment.firstIndex()) {
371 return currentSegment;
374 // If the index is in another segment, get the entry with the next lowest first index.
375 Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
376 if (segment != null) {
377 return segment.getValue();
379 return getFirstSegment();
385 * @param segment The segment to remove.
387 synchronized void removeSegment(JournalSegment segment) {
388 segments.remove(segment.firstIndex());
390 resetCurrentSegment();
394 * Creates a new segment.
396 JournalSegment createSegment(long id, long index) {
397 final JournalSegmentFile file;
399 file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
402 .withMaxSegmentSize(maxSegmentSize)
403 .withMaxEntries(maxEntriesPerSegment)
404 .withUpdated(System.currentTimeMillis())
406 } catch (IOException e) {
407 throw new StorageException(e);
410 final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
411 LOG.debug("Created segment: {}", segment);
416 * Loads all segments from disk.
418 * @return A collection of segments for the log.
420 protected Collection<JournalSegment> loadSegments() {
421 // Ensure log directories are created.
424 final var segments = new TreeMap<Long, JournalSegment>();
426 // Iterate through all files in the log directory.
427 for (var file : directory.listFiles(File::isFile)) {
429 // If the file looks like a segment file, attempt to load the segment.
430 if (JournalSegmentFile.isSegmentFile(name, file)) {
431 final JournalSegmentFile segmentFile;
433 segmentFile = JournalSegmentFile.openExisting(file.toPath());
434 } catch (IOException e) {
435 throw new StorageException(e);
439 LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path());
441 // Add the segment to the segments list.
442 final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
443 segments.put(segment.firstIndex(), segment);
447 // Verify that all the segments in the log align with one another.
448 JournalSegment previousSegment = null;
449 boolean corrupted = false;
450 final var iterator = segments.entrySet().iterator();
451 while (iterator.hasNext()) {
452 final var segment = iterator.next().getValue();
453 if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
454 LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
455 previousSegment.file().path());
462 previousSegment = segment;
465 return segments.values();
469 * Resets journal readers to the given head.
471 * @param index The index at which to reset readers.
473 void resetHead(long index) {
474 for (var reader : readers) {
475 if (reader.getNextIndex() < index) {
482 * Resets journal readers to the given tail.
484 * @param index The index at which to reset readers.
486 void resetTail(long index) {
487 for (var reader : readers) {
488 if (reader.getNextIndex() >= index) {
494 void closeReader(SegmentedJournalReader<E> reader) {
495 readers.remove(reader);
499 public boolean isOpen() {
504 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
506 * @param index the index from which to remove segments
507 * @return indicates whether a segment can be removed from the journal
509 public boolean isCompactable(long index) {
510 Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
511 return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
515 * Returns the index of the last segment in the log.
517 * @param index the compaction index
518 * @return the starting index of the last segment in the log
520 public long getCompactableIndex(long index) {
521 Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
522 return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
526 * Compacts the journal up to the given index.
528 * The semantics of compaction are not specified by this interface.
530 * @param index The index up to which to compact the journal.
532 public void compact(long index) {
533 final var segmentEntry = segments.floorEntry(index);
534 if (segmentEntry != null) {
535 final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
536 if (!compactSegments.isEmpty()) {
537 LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
538 compactSegments.values().forEach(JournalSegment::delete);
539 compactSegments.clear();
540 resetHead(segmentEntry.getValue().firstIndex());
546 public void close() {
547 segments.values().forEach(JournalSegment::close);
548 currentSegment = null;
553 * Returns whether {@code flushOnCommit} is enabled for the log.
555 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
557 boolean isFlushOnCommit() {
558 return flushOnCommit;
562 * Commits entries up to the given index.
564 * @param index The index up to which to commit entries.
566 void setCommitIndex(long index) {
567 this.commitIndex = index;
571 * Returns the Raft log commit index.
573 * @return The Raft log commit index.
575 long getCommitIndex() {
582 public static final class Builder<E> {
583 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
584 private static final String DEFAULT_NAME = "atomix";
585 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
586 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
587 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
588 private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
589 private static final double DEFAULT_INDEX_DENSITY = .005;
591 private String name = DEFAULT_NAME;
592 private StorageLevel storageLevel = StorageLevel.DISK;
593 private File directory = new File(DEFAULT_DIRECTORY);
594 private JournalSerdes namespace;
595 private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
596 private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
597 private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
598 private double indexDensity = DEFAULT_INDEX_DENSITY;
599 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
606 * Sets the storage name.
608 * @param name The storage name.
609 * @return The storage builder.
611 public Builder<E> withName(String name) {
612 this.name = requireNonNull(name, "name cannot be null");
617 * Sets the log storage level, returning the builder for method chaining.
619 * The storage level indicates how individual entries should be persisted in the journal.
621 * @param storageLevel The log storage level.
622 * @return The storage builder.
624 public Builder<E> withStorageLevel(StorageLevel storageLevel) {
625 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
630 * Sets the log directory, returning the builder for method chaining.
632 * The log will write segment files into the provided directory.
634 * @param directory The log directory.
635 * @return The storage builder.
636 * @throws NullPointerException If the {@code directory} is {@code null}
638 public Builder<E> withDirectory(String directory) {
639 return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
643 * Sets the log directory, returning the builder for method chaining.
645 * The log will write segment files into the provided directory.
647 * @param directory The log directory.
648 * @return The storage builder.
649 * @throws NullPointerException If the {@code directory} is {@code null}
651 public Builder<E> withDirectory(File directory) {
652 this.directory = requireNonNull(directory, "directory cannot be null");
657 * Sets the journal namespace, returning the builder for method chaining.
659 * @param namespace The journal serializer.
660 * @return The journal builder.
662 public Builder<E> withNamespace(JournalSerdes namespace) {
663 this.namespace = requireNonNull(namespace, "namespace cannot be null");
668 * Sets the maximum segment size in bytes, returning the builder for method chaining.
670 * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
671 * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
672 * segment and append new entries to that segment.
674 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
676 * @param maxSegmentSize The maximum segment size in bytes.
677 * @return The storage builder.
678 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
680 public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
681 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
682 "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
683 this.maxSegmentSize = maxSegmentSize;
688 * Sets the maximum entry size in bytes, returning the builder for method chaining.
690 * @param maxEntrySize the maximum entry size in bytes
691 * @return the storage builder
692 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
694 public Builder<E> withMaxEntrySize(int maxEntrySize) {
695 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
696 this.maxEntrySize = maxEntrySize;
701 * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
703 * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
704 * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
705 * new segment and append new entries to that segment.
707 * By default, the maximum entries per segment is {@code 1024 * 1024}.
709 * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
710 * @return The storage builder.
711 * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
713 * @deprecated since 3.0.2
716 public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
717 checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
718 checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
719 "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
720 this.maxEntriesPerSegment = maxEntriesPerSegment;
725 * Sets the journal index density.
727 * The index density is the frequency at which the position of entries written to the journal will be recorded in an
728 * in-memory index for faster seeking.
730 * @param indexDensity the index density
731 * @return the journal builder
732 * @throws IllegalArgumentException if the density is not between 0 and 1
734 public Builder<E> withIndexDensity(double indexDensity) {
735 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
736 this.indexDensity = indexDensity;
741 * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
744 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
745 * committed in a given segment.
747 * @return The storage builder.
749 public Builder<E> withFlushOnCommit() {
750 return withFlushOnCommit(true);
754 * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
757 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
758 * committed in a given segment.
760 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
761 * @return The storage builder.
763 public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
764 this.flushOnCommit = flushOnCommit;
769 * Build the {@link SegmentedJournal}.
771 * @return A new {@link SegmentedJournal}.
773 public SegmentedJournal<E> build() {
774 return new SegmentedJournal<>(
781 maxEntriesPerSegment,