2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
27 import java.util.NavigableMap;
28 import java.util.SortedMap;
29 import java.util.TreeMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
32 import com.google.common.collect.Sets;
33 import io.atomix.utils.serializer.Namespace;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import static com.google.common.base.Preconditions.checkArgument;
38 import static com.google.common.base.Preconditions.checkState;
39 import static java.util.Objects.requireNonNull;
44 public final class SegmentedJournal<E> implements Journal<E> {
47 * Returns a new Raft log builder.
49 * @return A new Raft log builder.
51 public static <E> Builder<E> builder() {
52 return new Builder<>();
55 private static final int SEGMENT_BUFFER_FACTOR = 3;
57 private final Logger log = LoggerFactory.getLogger(getClass());
58 private final String name;
59 private final StorageLevel storageLevel;
60 private final File directory;
61 private final Namespace namespace;
62 private final int maxSegmentSize;
63 private final int maxEntrySize;
64 private final int maxEntriesPerSegment;
65 private final double indexDensity;
66 private final boolean flushOnCommit;
67 private final SegmentedJournalWriter<E> writer;
68 private volatile long commitIndex;
70 private final NavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
71 private final Collection<SegmentedJournalReader<E>> readers = Sets.newConcurrentHashSet();
72 private JournalSegment<E> currentSegment;
74 private volatile boolean open = true;
76 public SegmentedJournal(
78 StorageLevel storageLevel,
83 int maxEntriesPerSegment,
85 boolean flushOnCommit) {
86 this.name = requireNonNull(name, "name cannot be null");
87 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
88 this.directory = requireNonNull(directory, "directory cannot be null");
89 this.namespace = requireNonNull(namespace, "namespace cannot be null");
90 this.maxSegmentSize = maxSegmentSize;
91 this.maxEntrySize = maxEntrySize;
92 this.maxEntriesPerSegment = maxEntriesPerSegment;
93 this.indexDensity = indexDensity;
94 this.flushOnCommit = flushOnCommit;
96 this.writer = openWriter();
100 * Returns the segment file name prefix.
102 * @return The segment file name prefix.
104 public String name() {
109 * Returns the storage directory.
111 * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
112 * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
113 * when the log is opened.
115 * @return The storage directory.
117 public File directory() {
122 * Returns the storage level.
124 * The storage level dictates how entries within individual journal segments should be stored.
126 * @return The storage level.
128 public StorageLevel storageLevel() {
133 * Returns the maximum journal segment size.
135 * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
137 * @return The maximum segment size in bytes.
139 public int maxSegmentSize() {
140 return maxSegmentSize;
144 * Returns the maximum journal entry size.
146 * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
148 * @return the maximum entry size in bytes
150 public int maxEntrySize() {
155 * Returns the maximum number of entries per segment.
157 * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
160 * @return The maximum number of entries per segment.
161 * @deprecated since 3.0.2
164 public int maxEntriesPerSegment() {
165 return maxEntriesPerSegment;
169 * Returns the collection of journal segments.
171 * @return the collection of journal segments
173 public Collection<JournalSegment<E>> segments() {
174 return segments.values();
178 * Returns the collection of journal segments with indexes greater than the given index.
180 * @param index the starting index
181 * @return the journal segments starting with indexes greater than or equal to the given index
183 public Collection<JournalSegment<E>> segments(long index) {
184 return segments.tailMap(index).values();
188 * Returns the total size of the journal.
190 * @return the total size of the journal
193 return segments.values().stream()
194 .mapToLong(segment -> segment.size())
199 public SegmentedJournalWriter<E> writer() {
204 public SegmentedJournalReader<E> openReader(long index) {
205 return openReader(index, SegmentedJournalReader.Mode.ALL);
209 * Opens a new Raft log reader with the given reader mode.
211 * @param index The index from which to begin reading entries.
212 * @param mode The mode in which to read entries.
213 * @return The Raft log reader.
215 public SegmentedJournalReader<E> openReader(long index, SegmentedJournalReader.Mode mode) {
216 SegmentedJournalReader<E> reader = new SegmentedJournalReader<>(this, index, mode);
222 * Opens a new journal writer.
224 * @return A new journal writer.
226 protected SegmentedJournalWriter<E> openWriter() {
227 return new SegmentedJournalWriter<>(this);
231 * Opens the segments.
233 private void open() {
234 // Load existing log segments from disk.
235 for (JournalSegment<E> segment : loadSegments()) {
236 segments.put(segment.descriptor().index(), segment);
239 // If a segment doesn't already exist, create an initial segment starting at index 1.
240 if (!segments.isEmpty()) {
241 currentSegment = segments.lastEntry().getValue();
243 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
246 .withMaxSegmentSize(maxSegmentSize)
247 .withMaxEntries(maxEntriesPerSegment)
250 currentSegment = createSegment(descriptor);
251 currentSegment.descriptor().update(System.currentTimeMillis());
253 segments.put(1L, currentSegment);
258 * Asserts that the manager is open.
260 * @throws IllegalStateException if the segment manager is not open
262 private void assertOpen() {
263 checkState(currentSegment != null, "journal not open");
267 * Asserts that enough disk space is available to allocate a new segment.
269 private void assertDiskSpace() {
270 if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
271 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
276 * Resets the current segment, creating a new segment if necessary.
278 private synchronized void resetCurrentSegment() {
279 JournalSegment<E> lastSegment = getLastSegment();
280 if (lastSegment != null) {
281 currentSegment = lastSegment;
283 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
286 .withMaxSegmentSize(maxSegmentSize)
287 .withMaxEntries(maxEntriesPerSegment)
290 currentSegment = createSegment(descriptor);
292 segments.put(1L, currentSegment);
297 * Resets and returns the first segment in the journal.
299 * @param index the starting index of the journal
300 * @return the first segment
302 JournalSegment<E> resetSegments(long index) {
305 // If the index already equals the first segment index, skip the reset.
306 JournalSegment<E> firstSegment = getFirstSegment();
307 if (index == firstSegment.index()) {
311 for (JournalSegment<E> segment : segments.values()) {
317 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
320 .withMaxSegmentSize(maxSegmentSize)
321 .withMaxEntries(maxEntriesPerSegment)
323 currentSegment = createSegment(descriptor);
324 segments.put(index, currentSegment);
325 return currentSegment;
329 * Returns the first segment in the log.
331 * @throws IllegalStateException if the segment manager is not open
333 JournalSegment<E> getFirstSegment() {
335 Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
336 return segment != null ? segment.getValue() : null;
340 * Returns the last segment in the log.
342 * @throws IllegalStateException if the segment manager is not open
344 JournalSegment<E> getLastSegment() {
346 Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
347 return segment != null ? segment.getValue() : null;
351 * Creates and returns the next segment.
353 * @return The next segment.
354 * @throws IllegalStateException if the segment manager is not open
356 synchronized JournalSegment<E> getNextSegment() {
360 JournalSegment<E> lastSegment = getLastSegment();
361 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
362 .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
363 .withIndex(currentSegment.lastIndex() + 1)
364 .withMaxSegmentSize(maxSegmentSize)
365 .withMaxEntries(maxEntriesPerSegment)
368 currentSegment = createSegment(descriptor);
370 segments.put(descriptor.index(), currentSegment);
371 return currentSegment;
375 * Returns the segment following the segment with the given ID.
377 * @param index The segment index with which to look up the next segment.
378 * @return The next segment for the given index.
380 JournalSegment<E> getNextSegment(long index) {
381 Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
382 return nextSegment != null ? nextSegment.getValue() : null;
386 * Returns the segment for the given index.
388 * @param index The index for which to return the segment.
389 * @throws IllegalStateException if the segment manager is not open
391 synchronized JournalSegment<E> getSegment(long index) {
393 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
394 if (currentSegment != null && index > currentSegment.index()) {
395 return currentSegment;
398 // If the index is in another segment, get the entry with the next lowest first index.
399 Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
400 if (segment != null) {
401 return segment.getValue();
403 return getFirstSegment();
409 * @param segment The segment to remove.
411 synchronized void removeSegment(JournalSegment<E> segment) {
412 segments.remove(segment.index());
415 resetCurrentSegment();
419 * Creates a new segment.
421 JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
422 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
424 RandomAccessFile raf;
427 raf = new RandomAccessFile(segmentFile, "rw");
428 raf.setLength(descriptor.maxSegmentSize());
429 channel = raf.getChannel();
430 } catch (IOException e) {
431 throw new StorageException(e);
434 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
435 descriptor.copyTo(buffer);
438 channel.write(buffer);
439 } catch (IOException e) {
440 throw new StorageException(e);
445 } catch (IOException e) {
448 JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
449 log.debug("Created segment: {}", segment);
454 * Creates a new segment instance.
456 * @param segmentFile The segment file.
457 * @param descriptor The segment descriptor.
458 * @return The segment instance.
460 protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
461 return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
467 private JournalSegment<E> loadSegment(long segmentId) {
468 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
469 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
470 try (FileChannel channel = openChannel(segmentFile)) {
471 channel.read(buffer);
473 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
474 JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
475 log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
477 } catch (IOException e) {
478 throw new StorageException(e);
482 private FileChannel openChannel(File file) {
484 return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
485 } catch (IOException e) {
486 throw new StorageException(e);
491 * Loads all segments from disk.
493 * @return A collection of segments for the log.
495 protected Collection<JournalSegment<E>> loadSegments() {
496 // Ensure log directories are created.
499 TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
501 // Iterate through all files in the log directory.
502 for (File file : directory.listFiles(File::isFile)) {
504 // If the file looks like a segment file, attempt to load the segment.
505 if (JournalSegmentFile.isSegmentFile(name, file)) {
506 JournalSegmentFile segmentFile = new JournalSegmentFile(file);
507 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
508 try (FileChannel channel = openChannel(file)) {
509 channel.read(buffer);
511 } catch (IOException e) {
512 throw new StorageException(e);
515 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
518 JournalSegment<E> segment = loadSegment(descriptor.id());
520 // Add the segment to the segments list.
521 log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
522 segments.put(segment.index(), segment);
526 // Verify that all the segments in the log align with one another.
527 JournalSegment<E> previousSegment = null;
528 boolean corrupted = false;
529 Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
530 while (iterator.hasNext()) {
531 JournalSegment<E> segment = iterator.next().getValue();
532 if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
533 log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
541 previousSegment = segment;
544 return segments.values();
548 * Resets journal readers to the given head.
550 * @param index The index at which to reset readers.
552 void resetHead(long index) {
553 for (SegmentedJournalReader<E> reader : readers) {
554 if (reader.getNextIndex() < index) {
561 * Resets journal readers to the given tail.
563 * @param index The index at which to reset readers.
565 void resetTail(long index) {
566 for (SegmentedJournalReader<E> reader : readers) {
567 if (reader.getNextIndex() >= index) {
573 void closeReader(SegmentedJournalReader<E> reader) {
574 readers.remove(reader);
578 public boolean isOpen() {
583 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
585 * @param index the index from which to remove segments
586 * @return indicates whether a segment can be removed from the journal
588 public boolean isCompactable(long index) {
589 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
590 return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
594 * Returns the index of the last segment in the log.
596 * @param index the compaction index
597 * @return the starting index of the last segment in the log
599 public long getCompactableIndex(long index) {
600 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
601 return segmentEntry != null ? segmentEntry.getValue().index() : 0;
605 * Compacts the journal up to the given index.
607 * The semantics of compaction are not specified by this interface.
609 * @param index The index up to which to compact the journal.
611 public void compact(long index) {
612 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
613 if (segmentEntry != null) {
614 SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
615 if (!compactSegments.isEmpty()) {
616 log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
617 for (JournalSegment<E> segment : compactSegments.values()) {
618 log.trace("Deleting segment: {}", segment);
622 compactSegments.clear();
623 resetHead(segmentEntry.getValue().index());
629 public void close() {
630 segments.values().forEach(segment -> {
631 log.debug("Closing segment: {}", segment);
634 currentSegment = null;
639 * Returns whether {@code flushOnCommit} is enabled for the log.
641 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
643 boolean isFlushOnCommit() {
644 return flushOnCommit;
648 * Commits entries up to the given index.
650 * @param index The index up to which to commit entries.
652 void setCommitIndex(long index) {
653 this.commitIndex = index;
657 * Returns the Raft log commit index.
659 * @return The Raft log commit index.
661 long getCommitIndex() {
668 public static final class Builder<E> {
669 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
670 private static final String DEFAULT_NAME = "atomix";
671 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
672 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
673 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
674 private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
675 private static final double DEFAULT_INDEX_DENSITY = .005;
677 private String name = DEFAULT_NAME;
678 private StorageLevel storageLevel = StorageLevel.DISK;
679 private File directory = new File(DEFAULT_DIRECTORY);
680 private Namespace namespace;
681 private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
682 private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
683 private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
684 private double indexDensity = DEFAULT_INDEX_DENSITY;
685 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
687 protected Builder() {
691 * Sets the storage name.
693 * @param name The storage name.
694 * @return The storage builder.
696 public Builder<E> withName(String name) {
697 this.name = requireNonNull(name, "name cannot be null");
702 * Sets the log storage level, returning the builder for method chaining.
704 * The storage level indicates how individual entries should be persisted in the journal.
706 * @param storageLevel The log storage level.
707 * @return The storage builder.
709 public Builder<E> withStorageLevel(StorageLevel storageLevel) {
710 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
715 * Sets the log directory, returning the builder for method chaining.
717 * The log will write segment files into the provided directory.
719 * @param directory The log directory.
720 * @return The storage builder.
721 * @throws NullPointerException If the {@code directory} is {@code null}
723 public Builder<E> withDirectory(String directory) {
724 return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
728 * Sets the log directory, returning the builder for method chaining.
730 * The log will write segment files into the provided directory.
732 * @param directory The log directory.
733 * @return The storage builder.
734 * @throws NullPointerException If the {@code directory} is {@code null}
736 public Builder<E> withDirectory(File directory) {
737 this.directory = requireNonNull(directory, "directory cannot be null");
742 * Sets the journal namespace, returning the builder for method chaining.
744 * @param namespace The journal serializer.
745 * @return The journal builder.
747 public Builder<E> withNamespace(Namespace namespace) {
748 this.namespace = requireNonNull(namespace, "namespace cannot be null");
753 * Sets the maximum segment size in bytes, returning the builder for method chaining.
755 * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
756 * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
757 * segment and append new entries to that segment.
759 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
761 * @param maxSegmentSize The maximum segment size in bytes.
762 * @return The storage builder.
763 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
765 public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
766 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
767 this.maxSegmentSize = maxSegmentSize;
772 * Sets the maximum entry size in bytes, returning the builder for method chaining.
774 * @param maxEntrySize the maximum entry size in bytes
775 * @return the storage builder
776 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
778 public Builder<E> withMaxEntrySize(int maxEntrySize) {
779 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
780 this.maxEntrySize = maxEntrySize;
785 * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
787 * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
788 * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
789 * new segment and append new entries to that segment.
791 * By default, the maximum entries per segment is {@code 1024 * 1024}.
793 * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
794 * @return The storage builder.
795 * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
797 * @deprecated since 3.0.2
800 public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
801 checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
802 checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
803 "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
804 this.maxEntriesPerSegment = maxEntriesPerSegment;
809 * Sets the journal index density.
811 * The index density is the frequency at which the position of entries written to the journal will be recorded in an
812 * in-memory index for faster seeking.
814 * @param indexDensity the index density
815 * @return the journal builder
816 * @throws IllegalArgumentException if the density is not between 0 and 1
818 public Builder<E> withIndexDensity(double indexDensity) {
819 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
820 this.indexDensity = indexDensity;
825 * Sets the journal cache size.
827 * @param cacheSize the journal cache size
828 * @return the journal builder
829 * @throws IllegalArgumentException if the cache size is not positive
830 * @deprecated since 3.0.4
833 public Builder<E> withCacheSize(int cacheSize) {
834 checkArgument(cacheSize >= 0, "cacheSize must be positive");
839 * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
842 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
843 * committed in a given segment.
845 * @return The storage builder.
847 public Builder<E> withFlushOnCommit() {
848 return withFlushOnCommit(true);
852 * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
855 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
856 * committed in a given segment.
858 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
859 * @return The storage builder.
861 public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
862 this.flushOnCommit = flushOnCommit;
867 * Build the {@link SegmentedJournal}.
869 * @return A new {@link SegmentedJournal}.
871 public SegmentedJournal<E> build() {
872 return new SegmentedJournal<>(
879 maxEntriesPerSegment,