2 * Copyright 2017-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.storage.journal;
19 import java.io.IOException;
20 import java.io.RandomAccessFile;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.nio.file.StandardOpenOption;
24 import java.util.Collection;
25 import java.util.Iterator;
27 import java.util.NavigableMap;
28 import java.util.SortedMap;
29 import java.util.TreeMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
32 import com.google.common.collect.Sets;
33 import io.atomix.storage.StorageException;
34 import io.atomix.storage.StorageLevel;
35 import io.atomix.utils.serializer.Namespace;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import static com.google.common.base.Preconditions.checkArgument;
40 import static com.google.common.base.Preconditions.checkNotNull;
41 import static com.google.common.base.Preconditions.checkState;
46 public class SegmentedJournal<E> implements Journal<E> {
49 * Returns a new Raft log builder.
51 * @return A new Raft log builder.
53 public static <E> Builder<E> builder() {
54 return new Builder<>();
57 private static final int SEGMENT_BUFFER_FACTOR = 3;
59 private final Logger log = LoggerFactory.getLogger(getClass());
60 private final String name;
61 private final StorageLevel storageLevel;
62 private final File directory;
63 private final Namespace namespace;
64 private final int maxSegmentSize;
65 private final int maxEntrySize;
66 private final int maxEntriesPerSegment;
67 private final double indexDensity;
68 private final boolean flushOnCommit;
69 private final SegmentedJournalWriter<E> writer;
70 private volatile long commitIndex;
72 private final NavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
73 private final Collection<SegmentedJournalReader<E>> readers = Sets.newConcurrentHashSet();
74 private JournalSegment<E> currentSegment;
76 private volatile boolean open = true;
78 public SegmentedJournal(
80 StorageLevel storageLevel,
85 int maxEntriesPerSegment,
87 boolean flushOnCommit) {
88 this.name = checkNotNull(name, "name cannot be null");
89 this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null");
90 this.directory = checkNotNull(directory, "directory cannot be null");
91 this.namespace = checkNotNull(namespace, "namespace cannot be null");
92 this.maxSegmentSize = maxSegmentSize;
93 this.maxEntrySize = maxEntrySize;
94 this.maxEntriesPerSegment = maxEntriesPerSegment;
95 this.indexDensity = indexDensity;
96 this.flushOnCommit = flushOnCommit;
98 this.writer = openWriter();
102 * Returns the segment file name prefix.
104 * @return The segment file name prefix.
106 public String name() {
111 * Returns the storage directory.
113 * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
114 * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
115 * when the log is opened.
117 * @return The storage directory.
119 public File directory() {
124 * Returns the storage level.
126 * The storage level dictates how entries within individual journal segments should be stored.
128 * @return The storage level.
130 public StorageLevel storageLevel() {
135 * Returns the maximum journal segment size.
137 * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
139 * @return The maximum segment size in bytes.
141 public int maxSegmentSize() {
142 return maxSegmentSize;
146 * Returns the maximum journal entry size.
148 * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
150 * @return the maximum entry size in bytes
152 public int maxEntrySize() {
157 * Returns the maximum number of entries per segment.
159 * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
162 * @return The maximum number of entries per segment.
163 * @deprecated since 3.0.2
166 public int maxEntriesPerSegment() {
167 return maxEntriesPerSegment;
171 * Returns the collection of journal segments.
173 * @return the collection of journal segments
175 public Collection<JournalSegment<E>> segments() {
176 return segments.values();
180 * Returns the collection of journal segments with indexes greater than the given index.
182 * @param index the starting index
183 * @return the journal segments starting with indexes greater than or equal to the given index
185 public Collection<JournalSegment<E>> segments(long index) {
186 return segments.tailMap(index).values();
190 * Returns the total size of the journal.
192 * @return the total size of the journal
195 return segments.values().stream()
196 .mapToLong(segment -> segment.size())
201 public SegmentedJournalWriter<E> writer() {
206 public SegmentedJournalReader<E> openReader(long index) {
207 return openReader(index, SegmentedJournalReader.Mode.ALL);
211 * Opens a new Raft log reader with the given reader mode.
213 * @param index The index from which to begin reading entries.
214 * @param mode The mode in which to read entries.
215 * @return The Raft log reader.
217 public SegmentedJournalReader<E> openReader(long index, SegmentedJournalReader.Mode mode) {
218 SegmentedJournalReader<E> reader = new SegmentedJournalReader<>(this, index, mode);
224 * Opens a new journal writer.
226 * @return A new journal writer.
228 protected SegmentedJournalWriter<E> openWriter() {
229 return new SegmentedJournalWriter<>(this);
233 * Opens the segments.
235 private void open() {
236 // Load existing log segments from disk.
237 for (JournalSegment<E> segment : loadSegments()) {
238 segments.put(segment.descriptor().index(), segment);
241 // If a segment doesn't already exist, create an initial segment starting at index 1.
242 if (!segments.isEmpty()) {
243 currentSegment = segments.lastEntry().getValue();
245 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
248 .withMaxSegmentSize(maxSegmentSize)
249 .withMaxEntries(maxEntriesPerSegment)
252 currentSegment = createSegment(descriptor);
253 currentSegment.descriptor().update(System.currentTimeMillis());
255 segments.put(1L, currentSegment);
260 * Asserts that the manager is open.
262 * @throws IllegalStateException if the segment manager is not open
264 private void assertOpen() {
265 checkState(currentSegment != null, "journal not open");
269 * Asserts that enough disk space is available to allocate a new segment.
271 private void assertDiskSpace() {
272 if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
273 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
278 * Resets the current segment, creating a new segment if necessary.
280 private synchronized void resetCurrentSegment() {
281 JournalSegment<E> lastSegment = getLastSegment();
282 if (lastSegment != null) {
283 currentSegment = lastSegment;
285 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
288 .withMaxSegmentSize(maxSegmentSize)
289 .withMaxEntries(maxEntriesPerSegment)
292 currentSegment = createSegment(descriptor);
294 segments.put(1L, currentSegment);
299 * Resets and returns the first segment in the journal.
301 * @param index the starting index of the journal
302 * @return the first segment
304 JournalSegment<E> resetSegments(long index) {
307 // If the index already equals the first segment index, skip the reset.
308 JournalSegment<E> firstSegment = getFirstSegment();
309 if (index == firstSegment.index()) {
313 for (JournalSegment<E> segment : segments.values()) {
319 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
322 .withMaxSegmentSize(maxSegmentSize)
323 .withMaxEntries(maxEntriesPerSegment)
325 currentSegment = createSegment(descriptor);
326 segments.put(index, currentSegment);
327 return currentSegment;
331 * Returns the first segment in the log.
333 * @throws IllegalStateException if the segment manager is not open
335 JournalSegment<E> getFirstSegment() {
337 Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
338 return segment != null ? segment.getValue() : null;
342 * Returns the last segment in the log.
344 * @throws IllegalStateException if the segment manager is not open
346 JournalSegment<E> getLastSegment() {
348 Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
349 return segment != null ? segment.getValue() : null;
353 * Creates and returns the next segment.
355 * @return The next segment.
356 * @throws IllegalStateException if the segment manager is not open
358 synchronized JournalSegment<E> getNextSegment() {
362 JournalSegment<E> lastSegment = getLastSegment();
363 JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
364 .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
365 .withIndex(currentSegment.lastIndex() + 1)
366 .withMaxSegmentSize(maxSegmentSize)
367 .withMaxEntries(maxEntriesPerSegment)
370 currentSegment = createSegment(descriptor);
372 segments.put(descriptor.index(), currentSegment);
373 return currentSegment;
377 * Returns the segment following the segment with the given ID.
379 * @param index The segment index with which to look up the next segment.
380 * @return The next segment for the given index.
382 JournalSegment<E> getNextSegment(long index) {
383 Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
384 return nextSegment != null ? nextSegment.getValue() : null;
388 * Returns the segment for the given index.
390 * @param index The index for which to return the segment.
391 * @throws IllegalStateException if the segment manager is not open
393 synchronized JournalSegment<E> getSegment(long index) {
395 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
396 if (currentSegment != null && index > currentSegment.index()) {
397 return currentSegment;
400 // If the index is in another segment, get the entry with the next lowest first index.
401 Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
402 if (segment != null) {
403 return segment.getValue();
405 return getFirstSegment();
411 * @param segment The segment to remove.
413 synchronized void removeSegment(JournalSegment<E> segment) {
414 segments.remove(segment.index());
417 resetCurrentSegment();
421 * Creates a new segment.
423 JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
424 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
426 RandomAccessFile raf;
429 raf = new RandomAccessFile(segmentFile, "rw");
430 raf.setLength(descriptor.maxSegmentSize());
431 channel = raf.getChannel();
432 } catch (IOException e) {
433 throw new StorageException(e);
436 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
437 descriptor.copyTo(buffer);
440 channel.write(buffer);
441 } catch (IOException e) {
442 throw new StorageException(e);
447 } catch (IOException e) {
450 JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
451 log.debug("Created segment: {}", segment);
456 * Creates a new segment instance.
458 * @param segmentFile The segment file.
459 * @param descriptor The segment descriptor.
460 * @return The segment instance.
462 protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
463 return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
469 private JournalSegment<E> loadSegment(long segmentId) {
470 File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
471 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
472 try (FileChannel channel = openChannel(segmentFile)) {
473 channel.read(buffer);
475 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
476 JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
477 log.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
479 } catch (IOException e) {
480 throw new StorageException(e);
484 private FileChannel openChannel(File file) {
486 return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
487 } catch (IOException e) {
488 throw new StorageException(e);
493 * Loads all segments from disk.
495 * @return A collection of segments for the log.
497 protected Collection<JournalSegment<E>> loadSegments() {
498 // Ensure log directories are created.
501 TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
503 // Iterate through all files in the log directory.
504 for (File file : directory.listFiles(File::isFile)) {
506 // If the file looks like a segment file, attempt to load the segment.
507 if (JournalSegmentFile.isSegmentFile(name, file)) {
508 JournalSegmentFile segmentFile = new JournalSegmentFile(file);
509 ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
510 try (FileChannel channel = openChannel(file)) {
511 channel.read(buffer);
513 } catch (IOException e) {
514 throw new StorageException(e);
517 JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
520 JournalSegment<E> segment = loadSegment(descriptor.id());
522 // Add the segment to the segments list.
523 log.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
524 segments.put(segment.index(), segment);
528 // Verify that all the segments in the log align with one another.
529 JournalSegment<E> previousSegment = null;
530 boolean corrupted = false;
531 Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
532 while (iterator.hasNext()) {
533 JournalSegment<E> segment = iterator.next().getValue();
534 if (previousSegment != null && previousSegment.lastIndex() != segment.index() - 1) {
535 log.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
543 previousSegment = segment;
546 return segments.values();
550 * Resets journal readers to the given head.
552 * @param index The index at which to reset readers.
554 void resetHead(long index) {
555 for (SegmentedJournalReader<E> reader : readers) {
556 if (reader.getNextIndex() < index) {
563 * Resets journal readers to the given tail.
565 * @param index The index at which to reset readers.
567 void resetTail(long index) {
568 for (SegmentedJournalReader<E> reader : readers) {
569 if (reader.getNextIndex() >= index) {
575 void closeReader(SegmentedJournalReader<E> reader) {
576 readers.remove(reader);
580 public boolean isOpen() {
585 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
587 * @param index the index from which to remove segments
588 * @return indicates whether a segment can be removed from the journal
590 public boolean isCompactable(long index) {
591 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
592 return segmentEntry != null && segments.headMap(segmentEntry.getValue().index()).size() > 0;
596 * Returns the index of the last segment in the log.
598 * @param index the compaction index
599 * @return the starting index of the last segment in the log
601 public long getCompactableIndex(long index) {
602 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
603 return segmentEntry != null ? segmentEntry.getValue().index() : 0;
607 * Compacts the journal up to the given index.
609 * The semantics of compaction are not specified by this interface.
611 * @param index The index up to which to compact the journal.
613 public void compact(long index) {
614 Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
615 if (segmentEntry != null) {
616 SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
617 if (!compactSegments.isEmpty()) {
618 log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
619 for (JournalSegment<E> segment : compactSegments.values()) {
620 log.trace("Deleting segment: {}", segment);
624 compactSegments.clear();
625 resetHead(segmentEntry.getValue().index());
631 public void close() {
632 segments.values().forEach(segment -> {
633 log.debug("Closing segment: {}", segment);
636 currentSegment = null;
641 * Returns whether {@code flushOnCommit} is enabled for the log.
643 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
645 boolean isFlushOnCommit() {
646 return flushOnCommit;
650 * Commits entries up to the given index.
652 * @param index The index up to which to commit entries.
654 void setCommitIndex(long index) {
655 this.commitIndex = index;
659 * Returns the Raft log commit index.
661 * @return The Raft log commit index.
663 long getCommitIndex() {
670 public static class Builder<E> {
671 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
672 private static final String DEFAULT_NAME = "atomix";
673 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
674 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
675 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
676 private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
677 private static final double DEFAULT_INDEX_DENSITY = .005;
678 private static final int DEFAULT_CACHE_SIZE = 1024;
680 protected String name = DEFAULT_NAME;
681 protected StorageLevel storageLevel = StorageLevel.DISK;
682 protected File directory = new File(DEFAULT_DIRECTORY);
683 protected Namespace namespace;
684 protected int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
685 protected int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
686 protected int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
687 protected double indexDensity = DEFAULT_INDEX_DENSITY;
688 protected int cacheSize = DEFAULT_CACHE_SIZE;
689 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
691 protected Builder() {
695 * Sets the storage name.
697 * @param name The storage name.
698 * @return The storage builder.
700 public Builder<E> withName(String name) {
701 this.name = checkNotNull(name, "name cannot be null");
706 * Sets the log storage level, returning the builder for method chaining.
708 * The storage level indicates how individual entries should be persisted in the journal.
710 * @param storageLevel The log storage level.
711 * @return The storage builder.
713 public Builder<E> withStorageLevel(StorageLevel storageLevel) {
714 this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null");
719 * Sets the log directory, returning the builder for method chaining.
721 * The log will write segment files into the provided directory.
723 * @param directory The log directory.
724 * @return The storage builder.
725 * @throws NullPointerException If the {@code directory} is {@code null}
727 public Builder<E> withDirectory(String directory) {
728 return withDirectory(new File(checkNotNull(directory, "directory cannot be null")));
732 * Sets the log directory, returning the builder for method chaining.
734 * The log will write segment files into the provided directory.
736 * @param directory The log directory.
737 * @return The storage builder.
738 * @throws NullPointerException If the {@code directory} is {@code null}
740 public Builder<E> withDirectory(File directory) {
741 this.directory = checkNotNull(directory, "directory cannot be null");
746 * Sets the journal namespace, returning the builder for method chaining.
748 * @param namespace The journal serializer.
749 * @return The journal builder.
751 public Builder<E> withNamespace(Namespace namespace) {
752 this.namespace = checkNotNull(namespace, "namespace cannot be null");
757 * Sets the maximum segment size in bytes, returning the builder for method chaining.
759 * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
760 * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
761 * segment and append new entries to that segment.
763 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
765 * @param maxSegmentSize The maximum segment size in bytes.
766 * @return The storage builder.
767 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
769 public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
770 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES, "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
771 this.maxSegmentSize = maxSegmentSize;
776 * Sets the maximum entry size in bytes, returning the builder for method chaining.
778 * @param maxEntrySize the maximum entry size in bytes
779 * @return the storage builder
780 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
782 public Builder<E> withMaxEntrySize(int maxEntrySize) {
783 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
784 this.maxEntrySize = maxEntrySize;
789 * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
791 * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
792 * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
793 * new segment and append new entries to that segment.
795 * By default, the maximum entries per segment is {@code 1024 * 1024}.
797 * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
798 * @return The storage builder.
799 * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
801 * @deprecated since 3.0.2
804 public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
805 checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
806 checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
807 "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
808 this.maxEntriesPerSegment = maxEntriesPerSegment;
813 * Sets the journal index density.
815 * The index density is the frequency at which the position of entries written to the journal will be recorded in an
816 * in-memory index for faster seeking.
818 * @param indexDensity the index density
819 * @return the journal builder
820 * @throws IllegalArgumentException if the density is not between 0 and 1
822 public Builder<E> withIndexDensity(double indexDensity) {
823 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
824 this.indexDensity = indexDensity;
829 * Sets the journal cache size.
831 * @param cacheSize the journal cache size
832 * @return the journal builder
833 * @throws IllegalArgumentException if the cache size is not positive
834 * @deprecated since 3.0.4
837 public Builder<E> withCacheSize(int cacheSize) {
838 checkArgument(cacheSize >= 0, "cacheSize must be positive");
839 this.cacheSize = cacheSize;
844 * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
847 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
848 * committed in a given segment.
850 * @return The storage builder.
852 public Builder<E> withFlushOnCommit() {
853 return withFlushOnCommit(true);
857 * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
860 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
861 * committed in a given segment.
863 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
864 * @return The storage builder.
866 public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
867 this.flushOnCommit = flushOnCommit;
872 * Build the {@link SegmentedJournal}.
874 * @return A new {@link SegmentedJournal}.
876 public SegmentedJournal<E> build() {
877 return new SegmentedJournal<>(
884 maxEntriesPerSegment,