2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkState;
21 import static java.util.Objects.requireNonNull;
24 import java.io.IOException;
25 import java.util.Collection;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentNavigableMap;
29 import java.util.concurrent.ConcurrentSkipListMap;
30 import java.util.function.BiFunction;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.eclipse.jdt.annotation.Nullable;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * A {@link ByteBufJournal} Implementation.
40 public final class SegmentedByteBufJournal implements ByteBufJournal {
41 private static final Logger LOG = LoggerFactory.getLogger(SegmentedByteBufJournal.class);
42 private static final int SEGMENT_BUFFER_FACTOR = 3;
44 private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
45 private final Collection<ByteBufReader> readers = ConcurrentHashMap.newKeySet();
46 private final String name;
47 private final StorageLevel storageLevel;
48 private final File directory;
49 private final int maxSegmentSize;
50 private final int maxEntrySize;
51 private final double indexDensity;
52 private final boolean flushOnCommit;
53 private final @NonNull ByteBufWriter writer;
56 private JournalSegment currentSegment;
57 private volatile long commitIndex;
59 public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
60 final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) {
61 this.name = requireNonNull(name, "name cannot be null");
62 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
63 this.directory = requireNonNull(directory, "directory cannot be null");
64 this.maxSegmentSize = maxSegmentSize;
65 this.maxEntrySize = maxEntrySize;
66 this.indexDensity = indexDensity;
67 this.flushOnCommit = flushOnCommit;
69 // Load existing log segments from disk.
70 for (var segment : loadSegments()) {
71 segments.put(segment.firstIndex(), segment);
73 currentSegment = ensureLastSegment();
75 writer = new SegmentedByteBufWriter(this);
79 * Returns the total size of the journal.
81 * @return the total size of the journal
84 return segments.values().stream()
85 .mapToLong(segment -> {
87 return segment.file().size();
88 } catch (IOException e) {
89 throw new StorageException(e);
96 public long lastIndex() {
97 return lastSegment().lastIndex();
101 public ByteBufWriter writer() {
106 public ByteBufReader openReader(final long index) {
107 return openReader(index, SegmentedByteBufReader::new);
111 private ByteBufReader openReader(final long index,
112 final BiFunction<SegmentedByteBufJournal, JournalSegment, ByteBufReader> constructor) {
113 final var reader = constructor.apply(this, segment(index));
120 public ByteBufReader openCommitsReader(final long index) {
121 return openReader(index, SegmentedCommitsByteBufReader::new);
125 * Asserts that the manager is open.
127 * @throws IllegalStateException if the segment manager is not open
129 private void assertOpen() {
130 checkState(currentSegment != null, "journal not open");
134 * Asserts that enough disk space is available to allocate a new segment.
136 private void assertDiskSpace() {
137 if (directory.getUsableSpace() < maxSegmentSize * SEGMENT_BUFFER_FACTOR) {
138 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
143 * Resets and returns the first segment in the journal.
145 * @param index the starting index of the journal
146 * @return the first segment
148 JournalSegment resetSegments(final long index) {
151 // If the index already equals the first segment index, skip the reset.
152 final var firstSegment = firstSegment();
153 if (index == firstSegment.firstIndex()) {
157 segments.values().forEach(JournalSegment::delete);
159 final var newSegment = createInitialSegment();
160 currentSegment = newSegment;
165 * Returns the first segment in the log.
167 * @throws IllegalStateException if the segment manager is not open
169 JournalSegment firstSegment() {
171 return segments.firstEntry().getValue();
175 * Returns the last segment in the log.
177 * @throws IllegalStateException if the segment manager is not open
179 JournalSegment lastSegment() {
181 return segments.lastEntry().getValue();
185 * Returns the segment following the segment with the given ID.
187 * @param index The segment index with which to look up the next segment.
188 * @return The next segment for the given index, or {@code null} if no such segment exists
190 @Nullable JournalSegment tryNextSegment(final long index) {
191 final var higherEntry = segments.higherEntry(index);
192 return higherEntry != null ? higherEntry.getValue() : null;
196 * Creates and returns the next segment.
198 * @return The next segment.
199 * @throws IllegalStateException if the segment manager is not open
201 synchronized @NonNull JournalSegment createNextSegment() {
205 // FIXME: lastSegment should equal currentSegment. We should be asserting that.
206 final var index = currentSegment.lastIndex() + 1;
207 final var lastSegment = lastSegment();
208 final var nextSegment = createSegment(lastSegment.file().segmentId() + 1, index);
209 segments.put(index, nextSegment);
210 currentSegment = nextSegment;
215 * Returns the segment for the given index.
217 * @param index The index for which to return the segment.
218 * @throws IllegalStateException if the segment manager is not open
220 synchronized JournalSegment segment(final long index) {
222 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
223 if (currentSegment != null && index > currentSegment.firstIndex()) {
224 return currentSegment;
227 // If the index is in another segment, get the entry with the next lowest first index.
228 final var segment = segments.floorEntry(index);
229 return segment != null ? segment.getValue() : firstSegment();
235 * @param segment The segment to remove.
237 synchronized void removeSegment(final JournalSegment segment) {
238 segments.remove(segment.firstIndex());
241 // Reset current segment to last segment
242 currentSegment = ensureLastSegment();
246 * Creates a new segment.
248 * @param segmentId the segment ID
249 * @param firstIndex index of first entry
250 * @param A new segment
252 private @NonNull JournalSegment createSegment(final long segmentId, final long firstIndex) {
253 final JournalSegmentFile file;
255 file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
257 .withIndex(firstIndex)
258 .withMaxSegmentSize(maxSegmentSize)
259 // FIXME: propagate maxEntries
260 .withMaxEntries(Integer.MAX_VALUE)
261 .withUpdated(System.currentTimeMillis())
263 } catch (IOException e) {
264 throw new StorageException(e);
267 final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
268 LOG.debug("Created segment: {}", segment);
272 private @NonNull JournalSegment createInitialSegment() {
273 final var segment = createSegment(1, 1);
274 segments.put(1L, segment);
279 * Make sure there is a last segment and return it.
281 * @return the last segment
283 private JournalSegment ensureLastSegment() {
284 final var lastEntry = segments.lastEntry();
285 // if there is no segment, create an initial segment starting at index 1.
286 return lastEntry != null ? lastEntry.getValue() : createInitialSegment();
290 * Loads all segments from disk.
292 * @return A collection of segments for the log.
294 private Collection<JournalSegment> loadSegments() {
295 // Ensure log directories are created.
298 final var segmentsMap = new TreeMap<Long, JournalSegment>();
300 // Iterate through all files in the log directory.
301 for (var file : directory.listFiles(File::isFile)) {
303 // If the file looks like a segment file, attempt to load the segment.
304 if (JournalSegmentFile.isSegmentFile(name, file)) {
305 final JournalSegmentFile segmentFile;
307 segmentFile = JournalSegmentFile.openExisting(file.toPath());
308 } catch (IOException e) {
309 throw new StorageException(e);
313 LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path());
315 // Add the segment to the segments list.
316 final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
317 segmentsMap.put(segment.firstIndex(), segment);
321 // Verify that all the segments in the log align with one another.
322 JournalSegment previousSegment = null;
323 boolean corrupted = false;
324 for (var iterator = segmentsMap.entrySet().iterator(); iterator.hasNext(); ) {
325 final var segment = iterator.next().getValue();
326 if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
327 LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
328 previousSegment.file().path());
335 previousSegment = segment;
338 return segmentsMap.values();
342 * Resets journal readers to the given head.
344 * @param index The index at which to reset readers.
346 void resetHead(final long index) {
347 for (var reader : readers) {
348 if (reader.nextIndex() < index) {
355 * Resets journal readers to the given tail.
357 * @param index The index at which to reset readers.
359 void resetTail(final long index) {
360 for (var reader : readers) {
361 if (reader.nextIndex() >= index) {
367 void closeReader(final SegmentedByteBufReader reader) {
368 readers.remove(reader);
372 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
374 * @param index the index from which to remove segments
375 * @return indicates whether a segment can be removed from the journal
377 public boolean isCompactable(final long index) {
378 final var firstIndex = getCompactableIndex(index);
379 return firstIndex != 0 && !segments.headMap(firstIndex).isEmpty();
383 * Returns the index of the last segment in the log.
385 * @param index the compaction index
386 * @return the starting index of the last segment in the log
388 public long getCompactableIndex(final long index) {
389 final var segmentEntry = segments.floorEntry(index);
390 return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
394 * Compacts the journal up to the given index.
397 * The semantics of compaction are not specified by this interface.
399 * @param index The index up to which to compact the journal.
401 public void compact(final long index) {
402 final var firstIndex = getCompactableIndex(index);
403 if (firstIndex != 0) {
404 final var compactSegments = segments.headMap(firstIndex);
405 if (!compactSegments.isEmpty()) {
406 LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
407 compactSegments.values().forEach(JournalSegment::delete);
408 compactSegments.clear();
409 resetHead(firstIndex);
415 public void close() {
416 if (currentSegment != null) {
417 currentSegment = null;
418 segments.values().forEach(JournalSegment::close);
424 * Returns whether {@code flushOnCommit} is enabled for the log.
426 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
428 boolean isFlushOnCommit() {
429 return flushOnCommit;
433 * Updates commit index to the given value.
435 * @param index The index value.
437 void setCommitIndex(final long index) {
442 * Returns the journal last commit index.
444 * @return The journal last commit index.
446 long getCommitIndex() {
450 public static Builder builder() {
451 return new Builder();
455 * Segmented byte journal builder.
457 public static final class Builder {
458 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
459 private static final String DEFAULT_NAME = "atomix";
460 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
461 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
462 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
463 private static final double DEFAULT_INDEX_DENSITY = .005;
465 private String name = DEFAULT_NAME;
466 private StorageLevel storageLevel = StorageLevel.DISK;
467 private File directory = new File(DEFAULT_DIRECTORY);
468 private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
469 private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
470 private double indexDensity = DEFAULT_INDEX_DENSITY;
471 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
478 * Sets the journal name.
480 * @param name The journal name.
481 * @return The builder instance
483 public Builder withName(final String name) {
484 this.name = requireNonNull(name, "name cannot be null");
489 * Sets the storage level.
491 * @param storageLevel The storage level.
492 * @return The builder instance
494 public Builder withStorageLevel(final StorageLevel storageLevel) {
495 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
500 * Sets the journal directory.
502 * @param directory The log directory.
503 * @return The builder instance
504 * @throws NullPointerException If the {@code directory} is {@code null}
506 public Builder withDirectory(final String directory) {
507 return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
511 * Sets the journal directory.
513 * @param directory The log directory.
514 * @return The builder instance
515 * @throws NullPointerException If the {@code directory} is {@code null}
517 public Builder withDirectory(final File directory) {
518 this.directory = requireNonNull(directory, "directory cannot be null");
523 * Sets the maximum segment size in bytes.
524 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
526 * @param maxSegmentSize The maximum segment size in bytes.
527 * @return The builder instance
528 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
530 public Builder withMaxSegmentSize(final int maxSegmentSize) {
531 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
532 "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
533 this.maxSegmentSize = maxSegmentSize;
538 * Sets the maximum entry size in bytes.
540 * @param maxEntrySize the maximum entry size in bytes
541 * @return the builder instance
542 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
544 public Builder withMaxEntrySize(final int maxEntrySize) {
545 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
546 this.maxEntrySize = maxEntrySize;
551 * Sets the journal index density.
554 * The index density is the frequency at which the position of entries written to the journal will be
555 * recorded in an in-memory index for faster seeking.
557 * @param indexDensity the index density
558 * @return the builder instance
559 * @throws IllegalArgumentException if the density is not between 0 and 1
561 public Builder withIndexDensity(final double indexDensity) {
562 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
563 this.indexDensity = indexDensity;
568 * Enables flushing buffers to disk when entries are committed to a segment.
571 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
572 * an entry is committed in a given segment.
574 * @return The builder instance
576 public Builder withFlushOnCommit() {
577 return withFlushOnCommit(true);
581 * Sets whether to flush buffers to disk when entries are committed to a segment.
584 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
585 * an entry is committed in a given segment.
587 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
588 * @return The builder instance
590 public Builder withFlushOnCommit(final boolean flushOnCommit) {
591 this.flushOnCommit = flushOnCommit;
596 * Build the {@link SegmentedByteBufJournal}.
598 * @return {@link SegmentedByteBufJournal} instance built.
600 public SegmentedByteBufJournal build() {
601 return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize,
602 indexDensity, flushOnCommit);