2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkState;
21 import static java.util.Objects.requireNonNull;
24 import java.io.IOException;
25 import java.util.Collection;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentNavigableMap;
29 import java.util.concurrent.ConcurrentSkipListMap;
30 import java.util.function.BiFunction;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.eclipse.jdt.annotation.NonNullByDefault;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * A {@link ByteBufJournal} Implementation.
39 public final class SegmentedByteBufJournal implements ByteBufJournal {
40 private static final Logger LOG = LoggerFactory.getLogger(SegmentedByteBufJournal.class);
41 private static final int SEGMENT_BUFFER_FACTOR = 3;
43 private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
44 private final Collection<ByteBufReader> readers = ConcurrentHashMap.newKeySet();
45 private final String name;
46 private final StorageLevel storageLevel;
47 private final File directory;
48 private final int maxSegmentSize;
49 private final int maxEntrySize;
50 private final double indexDensity;
51 private final boolean flushOnCommit;
52 private final @NonNull ByteBufWriter writer;
54 private JournalSegment currentSegment;
55 private volatile long commitIndex;
57 public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
58 final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) {
59 this.name = requireNonNull(name, "name cannot be null");
60 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
61 this.directory = requireNonNull(directory, "directory cannot be null");
62 this.maxSegmentSize = maxSegmentSize;
63 this.maxEntrySize = maxEntrySize;
64 this.indexDensity = indexDensity;
65 this.flushOnCommit = flushOnCommit;
67 writer = new SegmentedByteBufWriter(this);
71 * Returns the total size of the journal.
73 * @return the total size of the journal
76 return segments.values().stream()
77 .mapToLong(segment -> {
79 return segment.file().size();
80 } catch (IOException e) {
81 throw new StorageException(e);
88 public ByteBufWriter writer() {
93 public ByteBufReader openReader(final long index) {
94 return openReader(index, SegmentedByteBufReader::new);
98 private ByteBufReader openReader(final long index,
99 final BiFunction<SegmentedByteBufJournal, JournalSegment, ByteBufReader> constructor) {
100 final var reader = constructor.apply(this, segment(index));
107 public ByteBufReader openCommitsReader(final long index) {
108 return openReader(index, SegmentedCommitsByteBufReader::new);
112 * Opens the segments.
114 private synchronized void open() {
115 // Load existing log segments from disk.
116 for (var segment : loadSegments()) {
117 segments.put(segment.firstIndex(), segment);
119 // If a segment doesn't already exist, create an initial segment starting at index 1.
120 if (segments.isEmpty()) {
121 currentSegment = createSegment(1, 1);
122 segments.put(1L, currentSegment);
124 currentSegment = segments.lastEntry().getValue();
129 * Asserts that the manager is open.
131 * @throws IllegalStateException if the segment manager is not open
133 private void assertOpen() {
134 checkState(currentSegment != null, "journal not open");
138 * Asserts that enough disk space is available to allocate a new segment.
140 private void assertDiskSpace() {
141 if (directory.getUsableSpace() < maxSegmentSize * SEGMENT_BUFFER_FACTOR) {
142 throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
147 * Resets the current segment, creating a new segment if necessary.
149 private synchronized void resetCurrentSegment() {
150 final var lastSegment = lastSegment();
151 if (lastSegment == null) {
152 currentSegment = createSegment(1, 1);
153 segments.put(1L, currentSegment);
155 currentSegment = lastSegment;
160 * Resets and returns the first segment in the journal.
162 * @param index the starting index of the journal
163 * @return the first segment
165 JournalSegment resetSegments(final long index) {
168 // If the index already equals the first segment index, skip the reset.
169 final var firstSegment = firstSegment();
170 if (index == firstSegment.firstIndex()) {
174 segments.values().forEach(JournalSegment::delete);
177 currentSegment = createSegment(1, index);
178 segments.put(index, currentSegment);
179 return currentSegment;
183 * Returns the first segment in the log.
185 * @throws IllegalStateException if the segment manager is not open
187 JournalSegment firstSegment() {
189 final var firstEntry = segments.firstEntry();
190 return firstEntry != null ? firstEntry.getValue() : nextSegment();
194 * Returns the last segment in the log.
196 * @throws IllegalStateException if the segment manager is not open
198 JournalSegment lastSegment() {
200 final var lastEntry = segments.lastEntry();
201 return lastEntry != null ? lastEntry.getValue() : nextSegment();
205 * Creates and returns the next segment.
207 * @return The next segment.
208 * @throws IllegalStateException if the segment manager is not open
210 synchronized JournalSegment nextSegment() {
214 final var index = currentSegment.lastIndex() + 1;
215 final var lastSegment = lastSegment();
216 currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index);
217 segments.put(index, currentSegment);
218 return currentSegment;
222 * Returns the segment following the segment with the given ID.
224 * @param index The segment index with which to look up the next segment.
225 * @return The next segment for the given index.
227 JournalSegment nextSegment(final long index) {
228 final var higherEntry = segments.higherEntry(index);
229 return higherEntry != null ? higherEntry.getValue() : null;
233 * Returns the segment for the given index.
235 * @param index The index for which to return the segment.
236 * @throws IllegalStateException if the segment manager is not open
238 synchronized JournalSegment segment(final long index) {
240 // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
241 if (currentSegment != null && index > currentSegment.firstIndex()) {
242 return currentSegment;
245 // If the index is in another segment, get the entry with the next lowest first index.
246 final var segment = segments.floorEntry(index);
247 return segment != null ? segment.getValue() : firstSegment();
253 * @param segment The segment to remove.
255 synchronized void removeSegment(final JournalSegment segment) {
256 segments.remove(segment.firstIndex());
258 resetCurrentSegment();
262 * Creates a new segment.
264 JournalSegment createSegment(final long id, final long index) {
265 final JournalSegmentFile file;
267 file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
270 .withMaxSegmentSize(maxSegmentSize)
271 // FIXME: propagate maxEntries
272 .withMaxEntries(Integer.MAX_VALUE)
273 .withUpdated(System.currentTimeMillis())
275 } catch (IOException e) {
276 throw new StorageException(e);
279 final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
280 LOG.debug("Created segment: {}", segment);
285 * Loads all segments from disk.
287 * @return A collection of segments for the log.
289 protected Collection<JournalSegment> loadSegments() {
290 // Ensure log directories are created.
293 final var segmentsMap = new TreeMap<Long, JournalSegment>();
295 // Iterate through all files in the log directory.
296 for (var file : directory.listFiles(File::isFile)) {
298 // If the file looks like a segment file, attempt to load the segment.
299 if (JournalSegmentFile.isSegmentFile(name, file)) {
300 final JournalSegmentFile segmentFile;
302 segmentFile = JournalSegmentFile.openExisting(file.toPath());
303 } catch (IOException e) {
304 throw new StorageException(e);
308 LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path());
310 // Add the segment to the segments list.
311 final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
312 segments.put(segment.firstIndex(), segment);
316 // Verify that all the segments in the log align with one another.
317 JournalSegment previousSegment = null;
318 boolean corrupted = false;
319 for (var iterator = segmentsMap.entrySet().iterator(); iterator.hasNext(); ) {
320 final var segment = iterator.next().getValue();
321 if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
322 LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
323 previousSegment.file().path());
330 previousSegment = segment;
333 return segmentsMap.values();
337 * Resets journal readers to the given head.
339 * @param index The index at which to reset readers.
341 void resetHead(final long index) {
342 for (var reader : readers) {
343 if (reader.nextIndex() < index) {
350 * Resets journal readers to the given tail.
352 * @param index The index at which to reset readers.
354 void resetTail(final long index) {
355 for (var reader : readers) {
356 if (reader.nextIndex() >= index) {
362 void closeReader(final SegmentedByteBufReader reader) {
363 readers.remove(reader);
367 * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
369 * @param index the index from which to remove segments
370 * @return indicates whether a segment can be removed from the journal
372 public boolean isCompactable(final long index) {
373 final var segmentEntry = segments.floorEntry(index);
374 return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
378 * Returns the index of the last segment in the log.
380 * @param index the compaction index
381 * @return the starting index of the last segment in the log
383 public long getCompactableIndex(final long index) {
384 final var segmentEntry = segments.floorEntry(index);
385 return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
389 * Compacts the journal up to the given index.
391 * The semantics of compaction are not specified by this interface.
393 * @param index The index up to which to compact the journal.
395 public void compact(final long index) {
396 final var segmentEntry = segments.floorEntry(index);
397 if (segmentEntry != null) {
398 final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
399 if (!compactSegments.isEmpty()) {
400 LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
401 compactSegments.values().forEach(JournalSegment::delete);
402 compactSegments.clear();
403 resetHead(segmentEntry.getValue().firstIndex());
409 public void close() {
410 if (currentSegment != null) {
411 currentSegment = null;
412 segments.values().forEach(JournalSegment::close);
418 * Returns whether {@code flushOnCommit} is enabled for the log.
420 * @return Indicates whether {@code flushOnCommit} is enabled for the log.
422 boolean isFlushOnCommit() {
423 return flushOnCommit;
427 * Updates commit index to the given value.
429 * @param index The index value.
431 void setCommitIndex(final long index) {
436 * Returns the journal last commit index.
438 * @return The journal last commit index.
440 long getCommitIndex() {
444 public static Builder builder() {
445 return new Builder();
449 * Segmented byte journal builder.
451 public static final class Builder {
452 private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
453 private static final String DEFAULT_NAME = "atomix";
454 private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
455 private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
456 private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
457 private static final double DEFAULT_INDEX_DENSITY = .005;
459 private String name = DEFAULT_NAME;
460 private StorageLevel storageLevel = StorageLevel.DISK;
461 private File directory = new File(DEFAULT_DIRECTORY);
462 private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
463 private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
464 private double indexDensity = DEFAULT_INDEX_DENSITY;
465 private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
472 * Sets the journal name.
474 * @param name The journal name.
475 * @return The builder instance
477 public Builder withName(final String name) {
478 this.name = requireNonNull(name, "name cannot be null");
483 * Sets the storage level.
485 * @param storageLevel The storage level.
486 * @return The builder instance
488 public Builder withStorageLevel(final StorageLevel storageLevel) {
489 this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
494 * Sets the journal directory.
496 * @param directory The log directory.
497 * @return The builder instance
498 * @throws NullPointerException If the {@code directory} is {@code null}
500 public Builder withDirectory(final String directory) {
501 return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
505 * Sets the journal directory
507 * @param directory The log directory.
508 * @return The builder instance
509 * @throws NullPointerException If the {@code directory} is {@code null}
511 public Builder withDirectory(final File directory) {
512 this.directory = requireNonNull(directory, "directory cannot be null");
517 * Sets the maximum segment size in bytes.
518 * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
520 * @param maxSegmentSize The maximum segment size in bytes.
521 * @return The builder instance
522 * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
524 public Builder withMaxSegmentSize(final int maxSegmentSize) {
525 checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
526 "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
527 this.maxSegmentSize = maxSegmentSize;
532 * Sets the maximum entry size in bytes.
534 * @param maxEntrySize the maximum entry size in bytes
535 * @return the builder instance
536 * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
538 public Builder withMaxEntrySize(final int maxEntrySize) {
539 checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
540 this.maxEntrySize = maxEntrySize;
545 * Sets the journal index density.
547 * The index density is the frequency at which the position of entries written to the journal will be
548 * recorded in an in-memory index for faster seeking.
550 * @param indexDensity the index density
551 * @return the builder instance
552 * @throws IllegalArgumentException if the density is not between 0 and 1
554 public Builder withIndexDensity(final double indexDensity) {
555 checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
556 this.indexDensity = indexDensity;
561 * Enables flushing buffers to disk when entries are committed to a segment.
563 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
564 * an entry is committed in a given segment.
566 * @return The builder instance
568 public Builder withFlushOnCommit() {
569 return withFlushOnCommit(true);
573 * Sets whether to flush buffers to disk when entries are committed to a segment.
575 * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
576 * an entry is committed in a given segment.
578 * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
579 * @return The builder instance
581 public Builder withFlushOnCommit(final boolean flushOnCommit) {
582 this.flushOnCommit = flushOnCommit;
587 * Build the {@link SegmentedByteBufJournal}.
589 * @return {@link SegmentedByteBufJournal} instance built.
591 public SegmentedByteBufJournal build() {
592 return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize,
593 indexDensity, flushOnCommit);