2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
27 import java.util.SortedMap;
28 import java.util.TreeMap;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentNavigableMap;
31 import java.util.concurrent.ConcurrentSkipListMap;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import static com.google.common.base.Preconditions.checkArgument;
36 import static com.google.common.base.Preconditions.checkState;
37 import static java.util.Objects.requireNonNull;
42 public final class SegmentedJournal<E> implements Journal<E> {
45 * Returns a new Raft log builder.
47 * @return A new Raft log builder.
49 public static <E> Builder<E> builder() {
50 return new Builder<>();
53 private static final int SEGMENT_BUFFER_FACTOR = 3;
55 private final Logger log = LoggerFactory.getLogger(getClass());
56 private final String name;
57 private final StorageLevel storageLevel;
58 private final File directory;
59 private final JournalSerdes namespace;
60 private final int maxSegmentSize;
61 private final int maxEntrySize;
62 private final int maxEntriesPerSegment;
63 private final double indexDensity;
64 private final boolean flushOnCommit;
65 private final SegmentedJournalWriter<E> writer;
66 private volatile long commitIndex;
68 private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
69 private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
70 private JournalSegment<E> currentSegment;
72 private volatile boolean open = true;
74 public SegmentedJournal(
76 StorageLevel storageLevel,
78 JournalSerdes namespace,
81 int maxEntriesPerSegment,
83 boolean flushOnCommit) {
84 this.name = requireNonNull(name, "name cannot be null");
85 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
86 this.directory = requireNonNull(directory, "directory cannot be null");
87 this.namespace = requireNonNull(namespace, "namespace cannot be null");
88 this.maxSegmentSize = maxSegmentSize;
89 this.maxEntrySize = maxEntrySize;
90 this.maxEntriesPerSegment = maxEntriesPerSegment;
91 this.indexDensity = indexDensity;
92 this.flushOnCommit = flushOnCommit;
94 this.writer = openWriter();
98 * Returns the segment file name prefix.
100 * @return The segment file name prefix.
102 public String name() {
107 * Returns the storage directory.
109 * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
110 * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
111 * when the log is opened.
113 * @return The storage directory.
115 public File directory() {
120 * Returns the storage level.
122 * The storage level dictates how entries within individual journal segments should be stored.
124 * @return The storage level.
126 public StorageLevel storageLevel() {
131 * Returns the maximum journal segment size.
133 * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
135 * @return The maximum segment size in bytes.
137 public int maxSegmentSize() {
138 return maxSegmentSize;
142 * Returns the maximum journal entry size.
144 * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
146 * @return the maximum entry size in bytes
148 public int maxEntrySize() {
153 * Returns the maximum number of entries per segment.
155 * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
158 * @return The maximum number of entries per segment.
159 * @deprecated since 3.0.2
162 public int maxEntriesPerSegment() {
163 return maxEntriesPerSegment;
167 * Returns the collection of journal segments.
169 * @return the collection of journal segments
171 public Collection<JournalSegment<E>> segments() {
172 return segments.values();
176 * Returns the collection of journal segments with indexes greater than the given index.
178 * @param index the starting index
179 * @return the journal segments starting with indexes greater than or equal to the given index
181 public Collection<JournalSegment<E>> segments(long index) {
182 return segments.tailMap(index).values();
186 * Returns the total size of the journal.
188 * @return the total size of the journal
191 return segments.values().stream()
192 .mapToLong(segment -> segment.size())
197 public SegmentedJournalWriter<E> writer() {
202 public SegmentedJournalReader<E> openReader(long index) {
203 return openReader(index, SegmentedJournalReader.Mode.ALL);
207 * Opens a new Raft log reader with the given reader mode.
209 * @param index The index from which to begin reading entries.
210 * @param mode The mode in which to read entries.
211 * @return The Raft log reader.
213 public SegmentedJournalReader<E> openReader(long index, SegmentedJournalReader.Mode mode) {
214 SegmentedJournalReader<E> reader = new SegmentedJournalReader<>(this, index, mode);
220 * Opens a new journal writer.
222 * @return A new journal writer.
224 protected SegmentedJournalWriter<E> openWriter() {
225 return new SegmentedJournalWriter<>(this);
229 * Opens the segments.
231 private synchronized void open() {
232 // Load existing log segments from disk.
233 for (JournalSegment<E> segment : loadSegments()) {
234 segments.put(segment.descriptor().index(), segment);
237 // If a segment doesn't already exist, create an initial segment starting at index 1.
238 if (!segments.isEmpty()) {
239 currentSegment = segments.lastEntry().getValue();
241 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
244 .withMaxSegmentSize(maxSegmentSize)
245 .withMaxEntries(maxEntriesPerSegment)
248 currentSegment = createSegment(descriptor);
249 currentSegment.descriptor().update(System.currentTimeMillis());
251 segments.put(1L, currentSegment);
256 * Asserts that the manager is open.
258 * @throws IllegalStateException if the segment manager is not open
260 private void assertOpen() {
261 checkState(currentSegment != null, "journal not open");
265 * Asserts that enough disk space is available to allocate a new segment.
267 private void assertDiskSpace() {
268 if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
269 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
274 * Resets the current segment, creating a new segment if necessary.
276 private synchronized void resetCurrentSegment() {
277 JournalSegment<E> lastSegment = getLastSegment();
278 if (lastSegment != null) {
279 currentSegment = lastSegment;
281 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
284 .withMaxSegmentSize(maxSegmentSize)
285 .withMaxEntries(maxEntriesPerSegment)
288 currentSegment = createSegment(descriptor);
290 segments.put(1L, currentSegment);
295 * Resets and returns the first segment in the journal.
297 * @param index the starting index of the journal
298 * @return the first segment
300 JournalSegment<E> resetSegments(long index) {
303 // If the index already equals the first segment index, skip the reset.
304 JournalSegment<E> firstSegment = getFirstSegment();
305 if (index == firstSegment.index()) {
309 for (JournalSegment<E> segment : segments.values()) {
315 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
318 .withMaxSegmentSize(maxSegmentSize)
319 .withMaxEntries(maxEntriesPerSegment)
321 currentSegment = createSegment(descriptor);
322 segments.put(index, currentSegment);
323 return currentSegment;
327 * Returns the first segment in the log.
329 * @throws IllegalStateException if the segment manager is not open
331 JournalSegment<E> getFirstSegment() {
333 Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
334 return segment != null ? segment.getValue() : null;
338 * Returns the last segment in the log.
340 * @throws IllegalStateException if the segment manager is not open
342 JournalSegment<E> getLastSegment() {
344 Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
345 return segment != null ? segment.getValue() : null;
349 * Creates and returns the next segment.
351 * @return The next segment.
352 * @throws IllegalStateException if the segment manager is not open
354 synchronized JournalSegment<E> getNextSegment() {
358 JournalSegment<E> lastSegment = getLastSegment();
359 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
360 .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
361 .withIndex(currentSegment.lastIndex() + 1)
362 .withMaxSegmentSize(maxSegmentSize)
363 .withMaxEntries(maxEntriesPerSegment)
366 currentSegment = createSegment(descriptor);
368 segments.put(descriptor.index(), currentSegment);
369 return currentSegment;
373 * Returns the segment following the segment with the given ID.
375 * @param index The segment index with which to look up the next segment.
376 * @return The next segment for the given index.
378 JournalSegment<E> getNextSegment(long index) {
379 Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
380 return nextSegment != null ? nextSegment.getValue() : null;
384 * Returns the segment for the given index.
386 * @param index The index for which to return the segment.
387 * @throws IllegalStateException if the segment manager is not open
389 synchronized JournalSegment<E> getSegment(long index) {
391 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
392 if (currentSegment != null && index > currentSegment.index()) {
393 return currentSegment;
396 // If the index is in another segment, get the entry with the next lowest first index.
397 Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
398 if (segment != null) {
399 return segment.getValue();
401 return getFirstSegment();
407 * @param segment The segment to remove.
409 synchronized void removeSegment(JournalSegment<E> segment) {
410 segments.remove(segment.index());
413 resetCurrentSegment();
417 * Creates a new segment.
419 JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
420 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
422 RandomAccessFile raf;
425 raf = new RandomAccessFile(segmentFile, "rw");
426 raf.setLength(descriptor.maxSegmentSize());
427 channel = raf.getChannel();
428 } catch (IOException e) {
429 throw new StorageException(e);
432 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
433 descriptor.copyTo(buffer);
436 channel.write(buffer);
437 } catch (IOException e) {
438 throw new StorageException(e);
443 } catch (IOException e) {
446 JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
447 log.debug("Created segment: {}", segment);
452 * Creates a new segment instance.
454 * @param segmentFile The segment file.
455 * @param descriptor The segment descriptor.
456 * @return The segment instance.
458 protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
459 return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
465 private JournalSegment<E> loadSegment(long segmentId) {
466 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
467 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
468 try (FileChannel channel = openChannel(segmentFile)) {
469 channel.read(buffer);
471 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
472 JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
473 log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
475 } catch (IOException e) {
476 throw new StorageException(e);
480 private FileChannel openChannel(File file) {
482 return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
483 } catch (IOException e) {
484 throw new StorageException(e);
489 * Loads all segments from disk.
491 * @return A collection of segments for the log.
493 protected Collection<JournalSegment<E>> loadSegments() {
494 // Ensure log directories are created.
497 TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
499 // Iterate through all files in the log directory.
500 for (File file : directory.listFiles(File::isFile)) {
502 // If the file looks like a segment file, attempt to load the segment.
503 if (JournalSegmentFile.isSegmentFile(name, file)) {
504 JournalSegmentFile segmentFile = new JournalSegmentFile(file);
505 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
506 try (FileChannel channel = openChannel(file)) {
507 channel.read(buffer);
509 } catch (IOException e) {
510 throw new StorageException(e);
513 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
516 JournalSegment<E> segment = loadSegment(descriptor.id());
518 // Add the segment to the segments list.
519 log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
520 segments.put(segment.index(), segment);
524 // Verify that all the segments in the log align with one another.
525 JournalSegment<E> previousSegment = null;
526 boolean corrupted = false;
527 Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
528 while (iterator.hasNext()) {
529 JournalSegment<E> segment = iterator.next().getValue();
530 if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
531 log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
539 previousSegment = segment;
542 return segments.values();
546 * Resets journal readers to the given head.
548 * @param index The index at which to reset readers.
550 void resetHead(long index) {
551 for (SegmentedJournalReader<E> reader : readers) {
552 if (reader.getNextIndex() < index) {
559 * Resets journal readers to the given tail.
561 * @param index The index at which to reset readers.
563 void resetTail(long index) {
564 for (SegmentedJournalReader<E> reader : readers) {
565 if (reader.getNextIndex() >= index) {
571 void closeReader(SegmentedJournalReader<E> reader) {
572 readers.remove(reader);
576 public boolean isOpen() {
581 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
583 * @param index the index from which to remove segments
584 * @return indicates whether a segment can be removed from the journal
586 public boolean isCompactable(long index) {
587 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
588 return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
592 * Returns the index of the last segment in the log.
594 * @param index the compaction index
595 * @return the starting index of the last segment in the log
597 public long getCompactableIndex(long index) {
598 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
599 return segmentEntry != null ? segmentEntry.getValue().index() : 0;
603 * Compacts the journal up to the given index.
605 * The semantics of compaction are not specified by this interface.
607 * @param index The index up to which to compact the journal.
609 public void compact(long index) {
610 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
611 if (segmentEntry != null) {
612 SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
613 if (!compactSegments.isEmpty()) {
614 log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
615 for (JournalSegment<E> segment : compactSegments.values()) {
616 log.trace("Deleting segment: {}", segment);
620 compactSegments.clear();
621 resetHead(segmentEntry.getValue().index());
627 public void close() {
628 segments.values().forEach(segment -> {
629 log.debug("Closing segment: {}", segment);
632 currentSegment = null;
637 * Returns whether {@code flushOnCommit} is enabled for the log.
639 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
641 boolean isFlushOnCommit() {
642 return flushOnCommit;
646 * Commits entries up to the given index.
648 * @param index The index up to which to commit entries.
650 void setCommitIndex(long index) {
651 this.commitIndex = index;
655 * Returns the Raft log commit index.
657 * @return The Raft log commit index.
659 long getCommitIndex() {
666 public static final class Builder<E> {
667 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
668 private static final String DEFAULT_NAME = "atomix";
669 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
670 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
671 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
672 private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
673 private static final double DEFAULT_INDEX_DENSITY = .005;
675 private String name = DEFAULT_NAME;
676 private StorageLevel storageLevel = StorageLevel.DISK;
677 private File directory = new File(DEFAULT_DIRECTORY);
678 private JournalSerdes namespace;
679 private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
680 private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
681 private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
682 private double indexDensity = DEFAULT_INDEX_DENSITY;
683 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
685 protected Builder() {
689 * Sets the storage name.
691 * @param name The storage name.
692 * @return The storage builder.
694 public Builder<E> withName(String name) {
695 this.name = requireNonNull(name, "name cannot be null");
700 * Sets the log storage level, returning the builder for method chaining.
702 * The storage level indicates how individual entries should be persisted in the journal.
704 * @param storageLevel The log storage level.
705 * @return The storage builder.
707 public Builder<E> withStorageLevel(StorageLevel storageLevel) {
708 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
713 * Sets the log directory, returning the builder for method chaining.
715 * The log will write segment files into the provided directory.
717 * @param directory The log directory.
718 * @return The storage builder.
719 * @throws NullPointerException If the {@code directory} is {@code null}
721 public Builder<E> withDirectory(String directory) {
722 return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
726 * Sets the log directory, returning the builder for method chaining.
728 * The log will write segment files into the provided directory.
730 * @param directory The log directory.
731 * @return The storage builder.
732 * @throws NullPointerException If the {@code directory} is {@code null}
734 public Builder<E> withDirectory(File directory) {
735 this.directory = requireNonNull(directory, "directory cannot be null");
740 * Sets the journal namespace, returning the builder for method chaining.
742 * @param namespace The journal serializer.
743 * @return The journal builder.
745 public Builder<E> withNamespace(JournalSerdes namespace) {
746 this.namespace = requireNonNull(namespace, "namespace cannot be null");
751 * Sets the maximum segment size in bytes, returning the builder for method chaining.
753 * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
754 * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
755 * segment and append new entries to that segment.
757 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
759 * @param maxSegmentSize The maximum segment size in bytes.
760 * @return The storage builder.
761 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
763 public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
764 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
765 this.maxSegmentSize = maxSegmentSize;
770 * Sets the maximum entry size in bytes, returning the builder for method chaining.
772 * @param maxEntrySize the maximum entry size in bytes
773 * @return the storage builder
774 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
776 public Builder<E> withMaxEntrySize(int maxEntrySize) {
777 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
778 this.maxEntrySize = maxEntrySize;
783 * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
785 * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
786 * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
787 * new segment and append new entries to that segment.
789 * By default, the maximum entries per segment is {@code 1024 * 1024}.
791 * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
792 * @return The storage builder.
793 * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
795 * @deprecated since 3.0.2
798 public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
799 checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
800 checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
801 "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
802 this.maxEntriesPerSegment = maxEntriesPerSegment;
807 * Sets the journal index density.
809 * The index density is the frequency at which the position of entries written to the journal will be recorded in an
810 * in-memory index for faster seeking.
812 * @param indexDensity the index density
813 * @return the journal builder
814 * @throws IllegalArgumentException if the density is not between 0 and 1
816 public Builder<E> withIndexDensity(double indexDensity) {
817 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
818 this.indexDensity = indexDensity;
823 * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
826 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
827 * committed in a given segment.
829 * @return The storage builder.
831 public Builder<E> withFlushOnCommit() {
832 return withFlushOnCommit(true);
836 * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
839 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
840 * committed in a given segment.
842 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
843 * @return The storage builder.
845 public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
846 this.flushOnCommit = flushOnCommit;
851 * Build the {@link SegmentedJournal}.
853 * @return A new {@link SegmentedJournal}.
855 public SegmentedJournal<E> build() {
856 return new SegmentedJournal<>(
863 maxEntriesPerSegment,