--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * A journal of byte arrays. Provides the ability to write modify entries via {@link ByteBufWriter} and read them
+ * back via {@link ByteBufReader}.
+ */
+@NonNullByDefault
+public interface ByteBufJournal extends AutoCloseable {
+ /**
+ * Returns the journal writer.
+ *
+ * @return The journal writer.
+ */
+ ByteBufWriter writer();
+
+ /**
+ * Opens a new {@link ByteBufReader} reading all entries.
+ *
+ * @param index The index at which to start the reader.
+ * @return A new journal reader.
+ */
+ ByteBufReader openReader(long index);
+
+ /**
+ * Opens a new {@link ByteBufReader} reading only committed entries.
+ *
+ * @param index The index at which to start the reader.
+ * @return A new journal reader.
+ */
+ ByteBufReader openCommitsReader(long index);
+
+ @Override
+ void close();
+}
*/
package io.atomix.storage.journal;
+import io.netty.buffer.ByteBuf;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
- * A {@link JournalReader} traversing only committed entries.
+ * Support for serialization of {@link ByteBufJournal} entries.
*/
@NonNullByDefault
-final class CommitsSegmentJournalReader<E> extends SegmentedJournalReader<E> {
- CommitsSegmentJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
- super(journal, segment);
- }
+public interface ByteBufMapper<T> {
+ /**
+ * Converts an object into a series of bytes in a {@link ByteBuf}.
+ *
+ * @param obj the object
+ * @return resulting buffer
+ */
+ ByteBuf objectToBytes(T obj) ;
- @Override
- public <T> T tryNext(final EntryMapper<E, T> mapper) {
- return getNextIndex() <= journal.getCommitIndex() ? super.tryNext(mapper) : null;
- }
+ /**
+ * Converts the contents of a {@link ByteBuf} to an object.
+ *
+ * @param buf buffer to convert
+ * @return resulting object
+ */
+ T bytesToObject(ByteBuf buf);
}
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+
+/**
+ * A reader of {@link ByteBufJournal} entries.
+ */
+@NonNullByDefault
+public interface ByteBufReader extends AutoCloseable {
+ /**
+ * A journal entry processor. Responsible for transforming bytes into their internal representation.
+ *
+ * @param <T> Internal representation type
+ */
+ @FunctionalInterface
+ interface EntryMapper<T> {
+ /**
+ * Process an entry.
+ *
+ * @param index entry index
+ * @param bytes entry bytes
+ * @return resulting internal representation
+ */
+ T mapEntry(long index, ByteBuf bytes);
+ }
+
+ /**
+ * Returns the first index in the journal.
+ *
+ * @return The first index in the journal
+ */
+ long firstIndex();
+
+ /**
+ * Returns the next reader index.
+ *
+ * @return The next reader index
+ */
+ long nextIndex();
+
+ /**
+ * Try to move to the next binary data block
+ *
+ * @param entryMapper callback to be invoked on binary data
+ * @return processed binary data, or {@code null}
+ */
+ <T> @Nullable T tryNext(EntryMapper<T> entryMapper);
+
+ /**
+ * Resets the reader to the start.
+ */
+ void reset();
+
+ /**
+ * Resets the reader to the given index.
+ *
+ * @param index The index to which to reset the reader
+ */
+ void reset(long index);
+
+ @Override
+ void close();
+}
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * A writer of {@link ByteBufJournal} entries.
+ */
+@NonNullByDefault
+public interface ByteBufWriter {
+ /**
+ * Returns the last written index.
+ *
+ * @return The last written index
+ */
+ long lastIndex();
+
+ /**
+ * Returns the next index to be written.
+ *
+ * @return The next index to be written
+ */
+ long nextIndex();
+
+ /**
+ * Appends an entry to the journal.
+ *
+ * @param bytes Data block to append
+ * @return The index of appended data block
+ */
+ // FIXME: throws IOException
+ long append(ByteBuf bytes);
+
+ /**
+ * Commits entries up to the given index.
+ *
+ * @param index The index up to which to commit entries.
+ */
+ void commit(long index);
+
+ /**
+ * Resets the head of the journal to the given index.
+ *
+ * @param index The index to which to reset the head of the journal
+ */
+ // FIXME: reconcile with reader's reset and truncate()
+ // FIXME: throws IOException
+ void reset(long index);
+
+ /**
+ * Truncates the log to the given index.
+ *
+ * @param index The index to which to truncate the log.
+ */
+ // FIXME: reconcile with reset()
+ // FIXME: throws IOException
+ void truncate(long index);
+
+ /**
+ * Flushes written entries to disk.
+ */
+ // FIXME: throws IOException
+ void flush();
+}
/**
* Try to move to the next entry.
*
- * @param mapper callback to be invoked for the entry
+ * @param entryMapper callback to be invoked for the entry
* @return processed entry, or {@code null}
*/
- <T> @Nullable T tryNext(EntryMapper<E, T> mapper);
+ <T> @Nullable T tryNext(EntryMapper<E, T> entryMapper);
/**
* Resets the reader to the start.
/**
* Reads the next binary data block
*
- * @param index entry index
* @return The binary data, or {@code null}
*/
- @Nullable ByteBuf readBytes(final long index) {
+ @Nullable ByteBuf readBytes() {
// Check if there is enough in the buffer remaining
final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
if (remaining < 0) {
position += SegmentEntry.HEADER_BYTES + length;
// rewind and return
- return Unpooled.buffer(length).writeBytes(entryBuffer.rewind());
+ return Unpooled.wrappedBuffer(entryBuffer.rewind());
}
/**
reader.setPosition(JournalSegmentDescriptor.BYTES);
while (index == 0 || nextIndex <= index) {
- final var buf = reader.readBytes(nextIndex);
+ final var buf = reader.readBytes();
if (buf == null) {
break;
}
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import io.atomix.utils.serializer.KryoJournalSerdesBuilder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Support for serialization of {@link Journal} entries.
*
- * @deprecated due to dependency on outdated Kryo library, {@link JournalSerializer} to be used instead.
+ * @deprecated due to dependency on outdated Kryo library, {@link ByteBufMapper} to be used instead.
*/
@Deprecated(forRemoval = true, since="9.0.3")
public interface JournalSerdes {
*/
<T> T deserialize(final InputStream stream, final int bufferSize);
+ /**
+ * Returns a {@link ByteBufMapper} backed by this object.
+ *
+ * @return a {@link ByteBufMapper} backed by this object
+ */
+ default <T> ByteBufMapper<T> toMapper() {
+ return new ByteBufMapper<>() {
+ @Override
+ public ByteBuf objectToBytes(final T obj) {
+ return Unpooled.wrappedBuffer(serialize(obj));
+ }
+
+ @Override
+ public T bytesToObject(final ByteBuf buf) {
+ // FIXME: ByteBufUtil creates a copy -- we do not want to do that!
+ return deserialize(ByteBufUtil.getBytes(buf));
+ }
+ };
+ }
+
/**
* Creates a new {@link JournalSerdes} builder.
*
+++ /dev/null
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package io.atomix.storage.journal;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.Unpooled;
-
-/**
- * Support for serialization of {@link Journal} entries.
- */
-public interface JournalSerializer<T> {
-
- /**
- * Serializes given object to byte array.
- *
- * @param obj Object to serialize
- * @return serialized bytes as {@link ByteBuf}
- */
- ByteBuf serialize(T obj) ;
-
- /**
- * Deserializes given byte array to Object.
- *
- * @param buf serialized bytes as {@link ByteBuf}
- * @return deserialized Object
- */
- T deserialize(final ByteBuf buf);
-
- static <E> JournalSerializer<E> wrap(final JournalSerdes serdes) {
- return new JournalSerializer<>() {
- @Override
- public ByteBuf serialize(final E obj) {
- return Unpooled.wrappedBuffer(serdes.serialize(obj));
- }
-
- @Override
- public E deserialize(final ByteBuf buf) {
- return serdes.deserialize(ByteBufUtil.getBytes(buf));
- }
- };
- }
-}
*
* @param index the index to which to reset the head of the journal
*/
+ // FIXME: reconcile with reader's reset and truncate()
void reset(long index);
/**
*
* @param index The index to which to truncate the log.
*/
+ // FIXME: reconcile with reset()
void truncate(long index);
/**
--- /dev/null
+/*
+ * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.BiFunction;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ByteBufJournal} Implementation.
+ */
+public final class SegmentedByteBufJournal implements ByteBufJournal {
+ private static final Logger LOG = LoggerFactory.getLogger(SegmentedByteBufJournal.class);
+ private static final int SEGMENT_BUFFER_FACTOR = 3;
+
+ private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
+ private final Collection<ByteBufReader> readers = ConcurrentHashMap.newKeySet();
+ private final String name;
+ private final StorageLevel storageLevel;
+ private final File directory;
+ private final int maxSegmentSize;
+ private final int maxEntrySize;
+ private final double indexDensity;
+ private final boolean flushOnCommit;
+ private final @NonNull ByteBufWriter writer;
+
+ private JournalSegment currentSegment;
+ private volatile long commitIndex;
+
+ public SegmentedByteBufJournal(final String name, final StorageLevel storageLevel, final File directory,
+ final int maxSegmentSize, final int maxEntrySize, final double indexDensity, final boolean flushOnCommit) {
+ this.name = requireNonNull(name, "name cannot be null");
+ this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
+ this.directory = requireNonNull(directory, "directory cannot be null");
+ this.maxSegmentSize = maxSegmentSize;
+ this.maxEntrySize = maxEntrySize;
+ this.indexDensity = indexDensity;
+ this.flushOnCommit = flushOnCommit;
+ open();
+ writer = new SegmentedByteBufWriter(this);
+ }
+
+ /**
+ * Returns the total size of the journal.
+ *
+ * @return the total size of the journal
+ */
+ public long size() {
+ return segments.values().stream()
+ .mapToLong(segment -> {
+ try {
+ return segment.file().size();
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ })
+ .sum();
+ }
+
+ @Override
+ public ByteBufWriter writer() {
+ return writer;
+ }
+
+ @Override
+ public ByteBufReader openReader(final long index) {
+ return openReader(index, SegmentedByteBufReader::new);
+ }
+
+ @NonNullByDefault
+ private ByteBufReader openReader(final long index,
+ final BiFunction<SegmentedByteBufJournal, JournalSegment, ByteBufReader> constructor) {
+ final var reader = constructor.apply(this, segment(index));
+ reader.reset(index);
+ readers.add(reader);
+ return reader;
+ }
+
+ @Override
+ public ByteBufReader openCommitsReader(final long index) {
+ return openReader(index, SegmentedCommitsByteBufReader::new);
+ }
+
+ /**
+ * Opens the segments.
+ */
+ private synchronized void open() {
+ // Load existing log segments from disk.
+ for (var segment : loadSegments()) {
+ segments.put(segment.firstIndex(), segment);
+ }
+ // If a segment doesn't already exist, create an initial segment starting at index 1.
+ if (segments.isEmpty()) {
+ currentSegment = createSegment(1, 1);
+ segments.put(1L, currentSegment);
+ } else {
+ currentSegment = segments.lastEntry().getValue();
+ }
+ }
+
+ /**
+ * Asserts that the manager is open.
+ *
+ * @throws IllegalStateException if the segment manager is not open
+ */
+ private void assertOpen() {
+ checkState(currentSegment != null, "journal not open");
+ }
+
+ /**
+ * Asserts that enough disk space is available to allocate a new segment.
+ */
+ private void assertDiskSpace() {
+ if (directory.getUsableSpace() < maxSegmentSize * SEGMENT_BUFFER_FACTOR) {
+ throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
+ }
+ }
+
+ /**
+ * Resets the current segment, creating a new segment if necessary.
+ */
+ private synchronized void resetCurrentSegment() {
+ final var lastSegment = lastSegment();
+ if (lastSegment == null) {
+ currentSegment = createSegment(1, 1);
+ segments.put(1L, currentSegment);
+ } else {
+ currentSegment = lastSegment;
+ }
+ }
+
+ /**
+ * Resets and returns the first segment in the journal.
+ *
+ * @param index the starting index of the journal
+ * @return the first segment
+ */
+ JournalSegment resetSegments(final long index) {
+ assertOpen();
+
+ // If the index already equals the first segment index, skip the reset.
+ final var firstSegment = firstSegment();
+ if (index == firstSegment.firstIndex()) {
+ return firstSegment;
+ }
+
+ segments.values().forEach(JournalSegment::delete);
+ segments.clear();
+
+ currentSegment = createSegment(1, index);
+ segments.put(index, currentSegment);
+ return currentSegment;
+ }
+
+ /**
+ * Returns the first segment in the log.
+ *
+ * @throws IllegalStateException if the segment manager is not open
+ */
+ JournalSegment firstSegment() {
+ assertOpen();
+ final var firstEntry = segments.firstEntry();
+ return firstEntry != null ? firstEntry.getValue() : nextSegment();
+ }
+
+ /**
+ * Returns the last segment in the log.
+ *
+ * @throws IllegalStateException if the segment manager is not open
+ */
+ JournalSegment lastSegment() {
+ assertOpen();
+ final var lastEntry = segments.lastEntry();
+ return lastEntry != null ? lastEntry.getValue() : nextSegment();
+ }
+
+ /**
+ * Creates and returns the next segment.
+ *
+ * @return The next segment.
+ * @throws IllegalStateException if the segment manager is not open
+ */
+ synchronized JournalSegment nextSegment() {
+ assertOpen();
+ assertDiskSpace();
+
+ final var index = currentSegment.lastIndex() + 1;
+ final var lastSegment = lastSegment();
+ currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index);
+ segments.put(index, currentSegment);
+ return currentSegment;
+ }
+
+ /**
+ * Returns the segment following the segment with the given ID.
+ *
+ * @param index The segment index with which to look up the next segment.
+ * @return The next segment for the given index.
+ */
+ JournalSegment nextSegment(final long index) {
+ final var higherEntry = segments.higherEntry(index);
+ return higherEntry != null ? higherEntry.getValue() : null;
+ }
+
+ /**
+ * Returns the segment for the given index.
+ *
+ * @param index The index for which to return the segment.
+ * @throws IllegalStateException if the segment manager is not open
+ */
+ synchronized JournalSegment segment(final long index) {
+ assertOpen();
+ // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
+ if (currentSegment != null && index > currentSegment.firstIndex()) {
+ return currentSegment;
+ }
+
+ // If the index is in another segment, get the entry with the next lowest first index.
+ final var segment = segments.floorEntry(index);
+ return segment != null ? segment.getValue() : firstSegment();
+ }
+
+ /**
+ * Removes a segment.
+ *
+ * @param segment The segment to remove.
+ */
+ synchronized void removeSegment(final JournalSegment segment) {
+ segments.remove(segment.firstIndex());
+ segment.delete();
+ resetCurrentSegment();
+ }
+
+ /**
+ * Creates a new segment.
+ */
+ JournalSegment createSegment(final long id, final long index) {
+ final JournalSegmentFile file;
+ try {
+ file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
+ .withId(id)
+ .withIndex(index)
+ .withMaxSegmentSize(maxSegmentSize)
+ // FIXME: propagate maxEntries
+ .withMaxEntries(Integer.MAX_VALUE)
+ .withUpdated(System.currentTimeMillis())
+ .build());
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+
+ final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
+ LOG.debug("Created segment: {}", segment);
+ return segment;
+ }
+
+ /**
+ * Loads all segments from disk.
+ *
+ * @return A collection of segments for the log.
+ */
+ protected Collection<JournalSegment> loadSegments() {
+ // Ensure log directories are created.
+ directory.mkdirs();
+
+ final var segmentsMap = new TreeMap<Long, JournalSegment>();
+
+ // Iterate through all files in the log directory.
+ for (var file : directory.listFiles(File::isFile)) {
+
+ // If the file looks like a segment file, attempt to load the segment.
+ if (JournalSegmentFile.isSegmentFile(name, file)) {
+ final JournalSegmentFile segmentFile;
+ try {
+ segmentFile = JournalSegmentFile.openExisting(file.toPath());
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+
+ // Load the segment.
+ LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path());
+
+ // Add the segment to the segments list.
+ final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
+ segments.put(segment.firstIndex(), segment);
+ }
+ }
+
+ // Verify that all the segments in the log align with one another.
+ JournalSegment previousSegment = null;
+ boolean corrupted = false;
+ for (var iterator = segmentsMap.entrySet().iterator(); iterator.hasNext(); ) {
+ final var segment = iterator.next().getValue();
+ if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
+ LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
+ previousSegment.file().path());
+ corrupted = true;
+ }
+ if (corrupted) {
+ segment.delete();
+ iterator.remove();
+ }
+ previousSegment = segment;
+ }
+
+ return segmentsMap.values();
+ }
+
+ /**
+ * Resets journal readers to the given head.
+ *
+ * @param index The index at which to reset readers.
+ */
+ void resetHead(final long index) {
+ for (var reader : readers) {
+ if (reader.nextIndex() < index) {
+ reader.reset(index);
+ }
+ }
+ }
+
+ /**
+ * Resets journal readers to the given tail.
+ *
+ * @param index The index at which to reset readers.
+ */
+ void resetTail(final long index) {
+ for (var reader : readers) {
+ if (reader.nextIndex() >= index) {
+ reader.reset(index);
+ }
+ }
+ }
+
+ void closeReader(final SegmentedByteBufReader reader) {
+ readers.remove(reader);
+ }
+
+ /**
+ * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
+ *
+ * @param index the index from which to remove segments
+ * @return indicates whether a segment can be removed from the journal
+ */
+ public boolean isCompactable(final long index) {
+ final var segmentEntry = segments.floorEntry(index);
+ return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
+ }
+
+ /**
+ * Returns the index of the last segment in the log.
+ *
+ * @param index the compaction index
+ * @return the starting index of the last segment in the log
+ */
+ public long getCompactableIndex(final long index) {
+ final var segmentEntry = segments.floorEntry(index);
+ return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
+ }
+
+ /**
+ * Compacts the journal up to the given index.
+ * <p>
+ * The semantics of compaction are not specified by this interface.
+ *
+ * @param index The index up to which to compact the journal.
+ */
+ public void compact(final long index) {
+ final var segmentEntry = segments.floorEntry(index);
+ if (segmentEntry != null) {
+ final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
+ if (!compactSegments.isEmpty()) {
+ LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
+ compactSegments.values().forEach(JournalSegment::delete);
+ compactSegments.clear();
+ resetHead(segmentEntry.getValue().firstIndex());
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (currentSegment != null) {
+ currentSegment = null;
+ segments.values().forEach(JournalSegment::close);
+ segments.clear();
+ }
+ }
+
+ /**
+ * Returns whether {@code flushOnCommit} is enabled for the log.
+ *
+ * @return Indicates whether {@code flushOnCommit} is enabled for the log.
+ */
+ boolean isFlushOnCommit() {
+ return flushOnCommit;
+ }
+
+ /**
+ * Updates commit index to the given value.
+ *
+ * @param index The index value.
+ */
+ void setCommitIndex(final long index) {
+ commitIndex = index;
+ }
+
+ /**
+ * Returns the journal last commit index.
+ *
+ * @return The journal last commit index.
+ */
+ long getCommitIndex() {
+ return commitIndex;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Segmented byte journal builder.
+ */
+ public static final class Builder {
+ private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
+ private static final String DEFAULT_NAME = "atomix";
+ private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
+ private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
+ private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
+ private static final double DEFAULT_INDEX_DENSITY = .005;
+
+ private String name = DEFAULT_NAME;
+ private StorageLevel storageLevel = StorageLevel.DISK;
+ private File directory = new File(DEFAULT_DIRECTORY);
+ private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
+ private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
+ private double indexDensity = DEFAULT_INDEX_DENSITY;
+ private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
+
+ private Builder() {
+ // on purpose
+ }
+
+ /**
+ * Sets the journal name.
+ *
+ * @param name The journal name.
+ * @return The builder instance
+ */
+ public Builder withName(final String name) {
+ this.name = requireNonNull(name, "name cannot be null");
+ return this;
+ }
+
+ /**
+ * Sets the storage level.
+ *
+ * @param storageLevel The storage level.
+ * @return The builder instance
+ */
+ public Builder withStorageLevel(final StorageLevel storageLevel) {
+ this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
+ return this;
+ }
+
+ /**
+ * Sets the journal directory.
+ *
+ * @param directory The log directory.
+ * @return The builder instance
+ * @throws NullPointerException If the {@code directory} is {@code null}
+ */
+ public Builder withDirectory(final String directory) {
+ return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
+ }
+
+ /**
+ * Sets the journal directory
+ *
+ * @param directory The log directory.
+ * @return The builder instance
+ * @throws NullPointerException If the {@code directory} is {@code null}
+ */
+ public Builder withDirectory(final File directory) {
+ this.directory = requireNonNull(directory, "directory cannot be null");
+ return this;
+ }
+
+ /**
+ * Sets the maximum segment size in bytes.
+ * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
+ *
+ * @param maxSegmentSize The maximum segment size in bytes.
+ * @return The builder instance
+ * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
+ */
+ public Builder withMaxSegmentSize(final int maxSegmentSize) {
+ checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
+ "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
+ this.maxSegmentSize = maxSegmentSize;
+ return this;
+ }
+
+ /**
+ * Sets the maximum entry size in bytes.
+ *
+ * @param maxEntrySize the maximum entry size in bytes
+ * @return the builder instance
+ * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
+ */
+ public Builder withMaxEntrySize(final int maxEntrySize) {
+ checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
+ this.maxEntrySize = maxEntrySize;
+ return this;
+ }
+
+ /**
+ * Sets the journal index density.
+ * <p>
+ * The index density is the frequency at which the position of entries written to the journal will be
+ * recorded in an in-memory index for faster seeking.
+ *
+ * @param indexDensity the index density
+ * @return the builder instance
+ * @throws IllegalArgumentException if the density is not between 0 and 1
+ */
+ public Builder withIndexDensity(final double indexDensity) {
+ checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
+ this.indexDensity = indexDensity;
+ return this;
+ }
+
+ /**
+ * Enables flushing buffers to disk when entries are committed to a segment.
+ * <p>
+ * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
+ * an entry is committed in a given segment.
+ *
+ * @return The builder instance
+ */
+ public Builder withFlushOnCommit() {
+ return withFlushOnCommit(true);
+ }
+
+ /**
+ * Sets whether to flush buffers to disk when entries are committed to a segment.
+ * <p>
+ * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time
+ * an entry is committed in a given segment.
+ *
+ * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
+ * @return The builder instance
+ */
+ public Builder withFlushOnCommit(final boolean flushOnCommit) {
+ this.flushOnCommit = flushOnCommit;
+ return this;
+ }
+
+ /**
+ * Build the {@link SegmentedByteBufJournal}.
+ *
+ * @return {@link SegmentedByteBufJournal} instance built.
+ */
+ public SegmentedByteBufJournal build() {
+ return new SegmentedByteBufJournal(name, storageLevel, directory, maxSegmentSize, maxEntrySize,
+ indexDensity, flushOnCommit);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.ByteBuf;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * A {@link ByteBufReader} implementation.
+ */
+sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCommitsByteBufReader {
+ final @NonNull SegmentedByteBufJournal journal;
+
+ private JournalSegment currentSegment;
+ private JournalSegmentReader currentReader;
+ private long nextIndex;
+
+ SegmentedByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) {
+ this.journal = requireNonNull(journal);
+ currentSegment = requireNonNull(segment);
+ currentReader = segment.createReader();
+ nextIndex = currentSegment.firstIndex();
+ }
+
+ @Override
+ public final long firstIndex() {
+ return journal.firstSegment().firstIndex();
+ }
+
+ @Override
+ public final long nextIndex() {
+ return nextIndex;
+ }
+
+ @Override
+ public final void reset() {
+ currentReader.close();
+ currentSegment = journal.firstSegment();
+ currentReader = currentSegment.createReader();
+ nextIndex = currentSegment.firstIndex();
+ }
+
+ @Override
+ public final void reset(final long index) {
+ // If the current segment is not open, it has been replaced. Reset the segments.
+ if (!currentSegment.isOpen()) {
+ reset();
+ }
+ if (index < nextIndex) {
+ rewind(index);
+ } else if (index > nextIndex) {
+ forwardTo(index);
+ } else {
+ resetCurrentReader(index);
+ }
+ }
+
+ private void resetCurrentReader(final long index) {
+ final var position = currentSegment.lookup(index - 1);
+ if (position != null) {
+ nextIndex = position.index();
+ currentReader.setPosition(position.position());
+ } else {
+ nextIndex = currentSegment.firstIndex();
+ currentReader.setPosition(JournalSegmentDescriptor.BYTES);
+ }
+ forwardTo(index);
+ }
+
+ /**
+ * Rewinds the journal to the given index.
+ */
+ private void rewind(final long index) {
+ if (currentSegment.firstIndex() >= index) {
+ final var segment = journal.segment(index - 1);
+ if (segment != null) {
+ currentReader.close();
+ currentSegment = segment;
+ currentReader = currentSegment.createReader();
+ }
+ }
+ resetCurrentReader(index);
+ }
+
+ private void forwardTo(final long index) {
+ while (nextIndex < index && tryAdvance(nextIndex) != null) {
+ // No-op -- nextIndex value is updated in tryAdvance()
+ }
+ }
+
+ @Override
+ public final <T> T tryNext(final EntryMapper<T> entryMapper) {
+ final var index = nextIndex;
+ final var bytes = tryAdvance(index);
+ return bytes == null ? null : entryMapper.mapEntry(index, bytes);
+ }
+
+ /**
+ * Attempt to read the next entry. {@code index} here is really {@code nextIndex} passed by callers, which already
+ * check it for invariants. If non-null is returned, {@code nextIndex} has already been set to {@code index + 1}.
+ *
+ * <p>
+ * This method is shared between 'all entries' and 'committed entries only' variants. The distinction is made by
+ * an additional check in {@link SegmentedCommitsByteBufReader#tryAdvance(long)}.
+ *
+ * @param index next index
+ * @return Entry bytes, or {@code null}
+ */
+ ByteBuf tryAdvance(final long index) {
+ var buf = currentReader.readBytes();
+ if (buf == null) {
+ final var nextSegment = journal.nextSegment(currentSegment.firstIndex());
+ if (nextSegment == null || nextSegment.firstIndex() != index) {
+ return null;
+ }
+ currentReader.close();
+ currentSegment = nextSegment;
+ currentReader = currentSegment.createReader();
+ buf = currentReader.readBytes();
+ if (buf == null) {
+ return null;
+ }
+ }
+ nextIndex = index + 1;
+ return buf;
+ }
+
+ @Override
+ public final void close() {
+ currentReader.close();
+ journal.closeReader(this);
+ }
+}
--- /dev/null
+/*
+ * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * A {@link ByteBufWriter} implementation.
+ */
+final class SegmentedByteBufWriter implements ByteBufWriter {
+ private final SegmentedByteBufJournal journal;
+
+ private JournalSegment currentSegment;
+ private JournalSegmentWriter currentWriter;
+
+ SegmentedByteBufWriter(final SegmentedByteBufJournal journal) {
+ this.journal = requireNonNull(journal);
+ currentSegment = journal.lastSegment();
+ currentWriter = currentSegment.acquireWriter();
+ }
+
+ @Override
+ public long lastIndex() {
+ return currentWriter.getLastIndex();
+ }
+
+ @Override
+ public long nextIndex() {
+ return currentWriter.getNextIndex();
+ }
+
+ @Override
+ public void reset(final long index) {
+ if (index > currentSegment.firstIndex()) {
+ currentSegment.releaseWriter();
+ currentSegment = journal.resetSegments(index);
+ currentWriter = currentSegment.acquireWriter();
+ } else {
+ truncate(index - 1);
+ }
+ journal.resetHead(index);
+ }
+
+ @Override
+ public void commit(final long index) {
+ if (index > journal.getCommitIndex()) {
+ journal.setCommitIndex(index);
+ if (journal.isFlushOnCommit()) {
+ flush();
+ }
+ }
+ }
+
+ @Override
+ public long append(final ByteBuf buf) {
+ var index = currentWriter.append(buf);
+ if (index != null) {
+ return index;
+ }
+ // Slow path: we do not have enough capacity
+ currentWriter.flush();
+ currentSegment.releaseWriter();
+ currentSegment = journal.nextSegment();
+ currentWriter = currentSegment.acquireWriter();
+ return verifyNotNull(currentWriter.append(buf));
+ }
+
+ @Override
+ public void truncate(final long index) {
+ if (index < journal.getCommitIndex()) {
+ throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
+ }
+
+ // Delete all segments with first indexes greater than the given index.
+ while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) {
+ currentSegment.releaseWriter();
+ journal.removeSegment(currentSegment);
+ currentSegment = journal.lastSegment();
+ currentWriter = currentSegment.acquireWriter();
+ }
+
+ // Truncate the current index.
+ currentWriter.truncate(index);
+
+ // Reset segment readers.
+ journal.resetTail(index + 1);
+ }
+
+ @Override
+ public void flush() {
+ currentWriter.flush();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * A {@link ByteBufReader} traversing only committed entries.
+ */
+final class SegmentedCommitsByteBufReader extends SegmentedByteBufReader {
+ SegmentedCommitsByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) {
+ super(journal, segment);
+ }
+
+ @Override
+ ByteBuf tryAdvance(final long index) {
+ return index <= journal.getCommitIndex() ? super.tryAdvance(index) : null;
+ }
+}
\ No newline at end of file
/*
* Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
package io.atomix.storage.journal;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Segmented journal.
*/
public final class SegmentedJournal<E> implements Journal<E> {
- /**
- * Returns a new Raft log builder.
- *
- * @return A new Raft log builder.
- */
- public static <E> Builder<E> builder() {
- return new Builder<>();
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournal.class);
- private static final int SEGMENT_BUFFER_FACTOR = 3;
-
- private final String name;
- private final StorageLevel storageLevel;
- private final File directory;
- private final JournalSerializer<E> serializer;
- private final int maxSegmentSize;
- private final int maxEntrySize;
- private final int maxEntriesPerSegment;
- private final double indexDensity;
- private final boolean flushOnCommit;
- private final SegmentedJournalWriter<E> writer;
- private volatile long commitIndex;
-
- private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
- private final Collection<SegmentedJournalReader<?>> readers = ConcurrentHashMap.newKeySet();
- private JournalSegment currentSegment;
-
- private volatile boolean open = true;
-
- public SegmentedJournal(
- String name,
- StorageLevel storageLevel,
- File directory,
- JournalSerdes namespace,
- int maxSegmentSize,
- int maxEntrySize,
- int maxEntriesPerSegment,
- double indexDensity,
- boolean flushOnCommit) {
- this.name = requireNonNull(name, "name cannot be null");
- this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
- this.directory = requireNonNull(directory, "directory cannot be null");
- this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
- this.maxSegmentSize = maxSegmentSize;
- this.maxEntrySize = maxEntrySize;
- this.maxEntriesPerSegment = maxEntriesPerSegment;
- this.indexDensity = indexDensity;
- this.flushOnCommit = flushOnCommit;
- open();
- this.writer = new SegmentedJournalWriter<>(this);
- }
-
- /**
- * Returns the segment file name prefix.
- *
- * @return The segment file name prefix.
- */
- public String name() {
- return name;
- }
-
- /**
- * Returns the storage directory.
- * <p>
- * The storage directory is the directory to which all segments write files. Segment files for multiple logs may be
- * stored in the storage directory, and files for each log instance will be identified by the {@code prefix} provided
- * when the log is opened.
- *
- * @return The storage directory.
- */
- public File directory() {
- return directory;
- }
-
- /**
- * Returns the storage level.
- * <p>
- * The storage level dictates how entries within individual journal segments should be stored.
- *
- * @return The storage level.
- */
- public StorageLevel storageLevel() {
- return storageLevel;
- }
-
- /**
- * Returns the maximum journal segment size.
- * <p>
- * The maximum segment size dictates the maximum size any segment in a segment may consume in bytes.
- *
- * @return The maximum segment size in bytes.
- */
- public int maxSegmentSize() {
- return maxSegmentSize;
- }
-
- /**
- * Returns the maximum journal entry size.
- * <p>
- * The maximum entry size dictates the maximum size any entry in the segment may consume in bytes.
- *
- * @return the maximum entry size in bytes
- */
- public int maxEntrySize() {
- return maxEntrySize;
- }
-
- /**
- * Returns the maximum number of entries per segment.
- * <p>
- * The maximum entries per segment dictates the maximum number of entries that are allowed to be stored in any segment
- * in a journal.
- *
- * @return The maximum number of entries per segment.
- * @deprecated since 3.0.2
- */
- @Deprecated
- public int maxEntriesPerSegment() {
- return maxEntriesPerSegment;
- }
-
- /**
- * Returns the collection of journal segments.
- *
- * @return the collection of journal segments
- */
- public Collection<JournalSegment> segments() {
- return segments.values();
- }
-
- /**
- * Returns the collection of journal segments with indexes greater than the given index.
- *
- * @param index the starting index
- * @return the journal segments starting with indexes greater than or equal to the given index
- */
- public Collection<JournalSegment> segments(long index) {
- return segments.tailMap(index).values();
- }
-
- /**
- * Returns serializer instance.
- *
- * @return serializer instance
- */
- JournalSerializer<E> serializer() {
- return serializer;
- }
-
- /**
- * Returns the total size of the journal.
- *
- * @return the total size of the journal
- */
- public long size() {
- return segments.values().stream()
- .mapToLong(segment -> {
- try {
- return segment.file().size();
- } catch (IOException e) {
- throw new StorageException(e);
- }
- })
- .sum();
- }
-
- @Override
- public JournalWriter<E> writer() {
- return writer;
- }
-
- @Override
- public JournalReader<E> openReader(long index) {
- return openReader(index, JournalReader.Mode.ALL);
- }
-
- /**
- * Opens a new Raft log reader with the given reader mode.
- *
- * @param index The index from which to begin reading entries.
- * @param mode The mode in which to read entries.
- * @return The Raft log reader.
- */
- @Override
- public JournalReader<E> openReader(long index, JournalReader.Mode mode) {
- final var segment = getSegment(index);
- final var reader = switch (mode) {
- case ALL -> new SegmentedJournalReader<>(this, segment);
- case COMMITS -> new CommitsSegmentJournalReader<>(this, segment);
- };
-
- // Forward reader to specified index
- long next = reader.getNextIndex();
- while (index > next && reader.tryAdvance()) {
- next = reader.getNextIndex();
+ private final AtomicBoolean open = new AtomicBoolean(true);
+ private final SegmentedByteBufJournal journal;
+ private final SegmentedJournalWriter<E> writer;
+ private final ByteBufMapper<E> mapper;
+
+ public SegmentedJournal(final SegmentedByteBufJournal journal, final ByteBufMapper<E> mapper) {
+ this.journal = requireNonNull(journal, "journal is required");
+ this.mapper = requireNonNull(mapper, "mapper cannot be null");
+ writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
}
- readers.add(reader);
- return reader;
- }
-
- /**
- * Opens the segments.
- */
- private synchronized void open() {
- // Load existing log segments from disk.
- for (var segment : loadSegments()) {
- segments.put(segment.firstIndex(), segment);
+ @Override
+ public JournalWriter<E> writer() {
+ return writer;
}
- // If a segment doesn't already exist, create an initial segment starting at index 1.
- if (!segments.isEmpty()) {
- currentSegment = segments.lastEntry().getValue();
- } else {
- currentSegment = createSegment(1, 1);
- segments.put(1L, currentSegment);
- }
- }
-
- /**
- * Asserts that the manager is open.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- private void assertOpen() {
- checkState(currentSegment != null, "journal not open");
- }
-
- /**
- * Asserts that enough disk space is available to allocate a new segment.
- */
- private void assertDiskSpace() {
- if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
- throw new StorageException.OutOfDiskSpace("Not enough space to allocate a new journal segment");
- }
- }
-
- /**
- * Resets the current segment, creating a new segment if necessary.
- */
- private synchronized void resetCurrentSegment() {
- final var lastSegment = getLastSegment();
- if (lastSegment == null) {
- currentSegment = createSegment(1, 1);
- segments.put(1L, currentSegment);
- } else {
- currentSegment = lastSegment;
- }
- }
-
- /**
- * Resets and returns the first segment in the journal.
- *
- * @param index the starting index of the journal
- * @return the first segment
- */
- JournalSegment resetSegments(long index) {
- assertOpen();
-
- // If the index already equals the first segment index, skip the reset.
- final var firstSegment = getFirstSegment();
- if (index == firstSegment.firstIndex()) {
- return firstSegment;
- }
-
- segments.values().forEach(JournalSegment::delete);
- segments.clear();
-
- currentSegment = createSegment(1, index);
- segments.put(index, currentSegment);
- return currentSegment;
- }
-
- /**
- * Returns the first segment in the log.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- JournalSegment getFirstSegment() {
- assertOpen();
- Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
- return segment != null ? segment.getValue() : null;
- }
-
- /**
- * Returns the last segment in the log.
- *
- * @throws IllegalStateException if the segment manager is not open
- */
- JournalSegment getLastSegment() {
- assertOpen();
- Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
- return segment != null ? segment.getValue() : null;
- }
-
- /**
- * Creates and returns the next segment.
- *
- * @return The next segment.
- * @throws IllegalStateException if the segment manager is not open
- */
- synchronized JournalSegment getNextSegment() {
- assertOpen();
- assertDiskSpace();
-
- final var index = currentSegment.lastIndex() + 1;
- final var lastSegment = getLastSegment();
- currentSegment = createSegment(lastSegment != null ? lastSegment.file().segmentId() + 1 : 1, index);
- segments.put(index, currentSegment);
- return currentSegment;
- }
-
- /**
- * Returns the segment following the segment with the given ID.
- *
- * @param index The segment index with which to look up the next segment.
- * @return The next segment for the given index.
- */
- JournalSegment getNextSegment(long index) {
- Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
- return nextSegment != null ? nextSegment.getValue() : null;
- }
-
- /**
- * Returns the segment for the given index.
- *
- * @param index The index for which to return the segment.
- * @throws IllegalStateException if the segment manager is not open
- */
- synchronized JournalSegment getSegment(long index) {
- assertOpen();
- // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
- if (currentSegment != null && index > currentSegment.firstIndex()) {
- return currentSegment;
- }
-
- // If the index is in another segment, get the entry with the next lowest first index.
- Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
- if (segment != null) {
- return segment.getValue();
- }
- return getFirstSegment();
- }
-
- /**
- * Removes a segment.
- *
- * @param segment The segment to remove.
- */
- synchronized void removeSegment(JournalSegment segment) {
- segments.remove(segment.firstIndex());
- segment.delete();
- resetCurrentSegment();
- }
-
- /**
- * Creates a new segment.
- */
- JournalSegment createSegment(long id, long index) {
- final JournalSegmentFile file;
- try {
- file = JournalSegmentFile.createNew(name, directory, JournalSegmentDescriptor.builder()
- .withId(id)
- .withIndex(index)
- .withMaxSegmentSize(maxSegmentSize)
- .withMaxEntries(maxEntriesPerSegment)
- .withUpdated(System.currentTimeMillis())
- .build());
- } catch (IOException e) {
- throw new StorageException(e);
- }
-
- final var segment = new JournalSegment(file, storageLevel, maxEntrySize, indexDensity);
- LOG.debug("Created segment: {}", segment);
- return segment;
- }
-
- /**
- * Loads all segments from disk.
- *
- * @return A collection of segments for the log.
- */
- protected Collection<JournalSegment> loadSegments() {
- // Ensure log directories are created.
- directory.mkdirs();
-
- final var segments = new TreeMap<Long, JournalSegment>();
-
- // Iterate through all files in the log directory.
- for (var file : directory.listFiles(File::isFile)) {
-
- // If the file looks like a segment file, attempt to load the segment.
- if (JournalSegmentFile.isSegmentFile(name, file)) {
- final JournalSegmentFile segmentFile;
- try {
- segmentFile = JournalSegmentFile.openExisting(file.toPath());
- } catch (IOException e) {
- throw new StorageException(e);
- }
-
- // Load the segment.
- LOG.debug("Loaded disk segment: {} ({})", segmentFile.segmentId(), segmentFile.path());
-
- // Add the segment to the segments list.
- final var segment = new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
- segments.put(segment.firstIndex(), segment);
- }
- }
-
- // Verify that all the segments in the log align with one another.
- JournalSegment previousSegment = null;
- boolean corrupted = false;
- final var iterator = segments.entrySet().iterator();
- while (iterator.hasNext()) {
- final var segment = iterator.next().getValue();
- if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
- LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
- previousSegment.file().path());
- corrupted = true;
- }
- if (corrupted) {
- segment.delete();
- iterator.remove();
- }
- previousSegment = segment;
- }
-
- return segments.values();
- }
-
- /**
- * Resets journal readers to the given head.
- *
- * @param index The index at which to reset readers.
- */
- void resetHead(long index) {
- for (var reader : readers) {
- if (reader.getNextIndex() < index) {
- reader.reset(index);
- }
- }
- }
-
- /**
- * Resets journal readers to the given tail.
- *
- * @param index The index at which to reset readers.
- */
- void resetTail(long index) {
- for (var reader : readers) {
- if (reader.getNextIndex() >= index) {
- reader.reset(index);
- }
- }
- }
-
- void closeReader(SegmentedJournalReader<E> reader) {
- readers.remove(reader);
- }
-
- @Override
- public boolean isOpen() {
- return open;
- }
-
- /**
- * Returns a boolean indicating whether a segment can be removed from the journal prior to the given index.
- *
- * @param index the index from which to remove segments
- * @return indicates whether a segment can be removed from the journal
- */
- public boolean isCompactable(long index) {
- Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
- return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
- }
-
- /**
- * Returns the index of the last segment in the log.
- *
- * @param index the compaction index
- * @return the starting index of the last segment in the log
- */
- public long getCompactableIndex(long index) {
- Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
- return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
- }
-
- /**
- * Compacts the journal up to the given index.
- * <p>
- * The semantics of compaction are not specified by this interface.
- *
- * @param index The index up to which to compact the journal.
- */
- public void compact(long index) {
- final var segmentEntry = segments.floorEntry(index);
- if (segmentEntry != null) {
- final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
- if (!compactSegments.isEmpty()) {
- LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
- compactSegments.values().forEach(JournalSegment::delete);
- compactSegments.clear();
- resetHead(segmentEntry.getValue().firstIndex());
- }
- }
- }
-
- @Override
- public void close() {
- segments.values().forEach(JournalSegment::close);
- currentSegment = null;
- open = false;
- }
-
- /**
- * Returns whether {@code flushOnCommit} is enabled for the log.
- *
- * @return Indicates whether {@code flushOnCommit} is enabled for the log.
- */
- boolean isFlushOnCommit() {
- return flushOnCommit;
- }
-
- /**
- * Commits entries up to the given index.
- *
- * @param index The index up to which to commit entries.
- */
- void setCommitIndex(long index) {
- this.commitIndex = index;
- }
-
- /**
- * Returns the Raft log commit index.
- *
- * @return The Raft log commit index.
- */
- long getCommitIndex() {
- return commitIndex;
- }
-
- /**
- * Raft log builder.
- */
- public static final class Builder<E> {
- private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
- private static final String DEFAULT_NAME = "atomix";
- private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
- private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
- private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
- private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
- private static final double DEFAULT_INDEX_DENSITY = .005;
-
- private String name = DEFAULT_NAME;
- private StorageLevel storageLevel = StorageLevel.DISK;
- private File directory = new File(DEFAULT_DIRECTORY);
- private JournalSerdes namespace;
- private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
- private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
- private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
- private double indexDensity = DEFAULT_INDEX_DENSITY;
- private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
-
- Builder() {
- // Hidden on purpose
+ @Override
+ public JournalReader<E> openReader(final long index) {
+ return openReader(index, JournalReader.Mode.ALL);
}
/**
- * Sets the storage name.
+ * Opens a new journal reader with the given reader mode.
*
- * @param name The storage name.
- * @return The storage builder.
+ * @param index The index from which to begin reading entries.
+ * @param mode The mode in which to read entries.
+ * @return The journal reader.
*/
- public Builder<E> withName(String name) {
- this.name = requireNonNull(name, "name cannot be null");
- return this;
+ @Override
+ public JournalReader<E> openReader(final long index, final JournalReader.Mode mode) {
+ final var byteReader = switch (mode) {
+ case ALL -> journal.openReader(index);
+ case COMMITS -> journal.openCommitsReader(index);
+ };
+ return new SegmentedJournalReader<>(byteReader, mapper);
}
- /**
- * Sets the log storage level, returning the builder for method chaining.
- * <p>
- * The storage level indicates how individual entries should be persisted in the journal.
- *
- * @param storageLevel The log storage level.
- * @return The storage builder.
- */
- public Builder<E> withStorageLevel(StorageLevel storageLevel) {
- this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
- return this;
+ @Override
+ public boolean isOpen() {
+ return open.get();
}
- /**
- * Sets the log directory, returning the builder for method chaining.
- * <p>
- * The log will write segment files into the provided directory.
- *
- * @param directory The log directory.
- * @return The storage builder.
- * @throws NullPointerException If the {@code directory} is {@code null}
- */
- public Builder<E> withDirectory(String directory) {
- return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
+ @Override
+ public void close() {
+ if (open.compareAndExchange(true, false)) {
+ journal.close();
+ }
}
/**
- * Sets the log directory, returning the builder for method chaining.
+ * Compacts the journal up to the given index.
* <p>
- * The log will write segment files into the provided directory.
+ * The semantics of compaction are not specified by this interface.
*
- * @param directory The log directory.
- * @return The storage builder.
- * @throws NullPointerException If the {@code directory} is {@code null}
+ * @param index The index up to which to compact the journal.
*/
- public Builder<E> withDirectory(File directory) {
- this.directory = requireNonNull(directory, "directory cannot be null");
- return this;
+ public void compact(final long index) {
+ journal.compact(index);
}
/**
- * Sets the journal namespace, returning the builder for method chaining.
+ * Returns a new segmented journal builder.
*
- * @param namespace The journal serializer.
- * @return The journal builder.
+ * @return A new segmented journal builder.
*/
- public Builder<E> withNamespace(JournalSerdes namespace) {
- this.namespace = requireNonNull(namespace, "namespace cannot be null");
- return this;
+ public static <E> Builder<E> builder() {
+ return new Builder<>();
}
- /**
- * Sets the maximum segment size in bytes, returning the builder for method chaining.
- * <p>
- * The maximum segment size dictates when logs should roll over to new segments. As entries are written to a segment
- * of the log, once the size of the segment surpasses the configured maximum segment size, the log will create a new
- * segment and append new entries to that segment.
- * <p>
- * By default, the maximum segment size is {@code 1024 * 1024 * 32}.
- *
- * @param maxSegmentSize The maximum segment size in bytes.
- * @return The storage builder.
- * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
- */
- public Builder<E> withMaxSegmentSize(int maxSegmentSize) {
- checkArgument(maxSegmentSize > JournalSegmentDescriptor.BYTES,
- "maxSegmentSize must be greater than " + JournalSegmentDescriptor.BYTES);
- this.maxSegmentSize = maxSegmentSize;
- return this;
- }
+ public static final class Builder<E> {
+ private final SegmentedByteBufJournal.Builder byteJournalBuilder = SegmentedByteBufJournal.builder();
+ private ByteBufMapper<E> mapper;
- /**
- * Sets the maximum entry size in bytes, returning the builder for method chaining.
- *
- * @param maxEntrySize the maximum entry size in bytes
- * @return the storage builder
- * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
- */
- public Builder<E> withMaxEntrySize(int maxEntrySize) {
- checkArgument(maxEntrySize > 0, "maxEntrySize must be positive");
- this.maxEntrySize = maxEntrySize;
- return this;
- }
+ private Builder() {
+ // on purpose
+ }
- /**
- * Sets the maximum number of allows entries per segment, returning the builder for method chaining.
- * <p>
- * The maximum entry count dictates when logs should roll over to new segments. As entries are written to a segment
- * of the log, if the entry count in that segment meets the configured maximum entry count, the log will create a
- * new segment and append new entries to that segment.
- * <p>
- * By default, the maximum entries per segment is {@code 1024 * 1024}.
- *
- * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
- * @return The storage builder.
- * @throws IllegalArgumentException If the {@code maxEntriesPerSegment} not greater than the default max entries
- * per segment
- * @deprecated since 3.0.2
- */
- @Deprecated
- public Builder<E> withMaxEntriesPerSegment(int maxEntriesPerSegment) {
- checkArgument(maxEntriesPerSegment > 0, "max entries per segment must be positive");
- checkArgument(maxEntriesPerSegment <= DEFAULT_MAX_ENTRIES_PER_SEGMENT,
- "max entries per segment cannot be greater than " + DEFAULT_MAX_ENTRIES_PER_SEGMENT);
- this.maxEntriesPerSegment = maxEntriesPerSegment;
- return this;
- }
+ /**
+ * Sets the journal name.
+ *
+ * @param name The journal name.
+ * @return The journal builder.
+ */
+ public Builder<E> withName(final String name) {
+ byteJournalBuilder.withName(name);
+ return this;
+ }
- /**
- * Sets the journal index density.
- * <p>
- * The index density is the frequency at which the position of entries written to the journal will be recorded in an
- * in-memory index for faster seeking.
- *
- * @param indexDensity the index density
- * @return the journal builder
- * @throws IllegalArgumentException if the density is not between 0 and 1
- */
- public Builder<E> withIndexDensity(double indexDensity) {
- checkArgument(indexDensity > 0 && indexDensity < 1, "index density must be between 0 and 1");
- this.indexDensity = indexDensity;
- return this;
- }
+ /**
+ * Sets the journal storage level.
+ * <p>
+ * The storage level indicates how individual entries will be persisted in the journal.
+ *
+ * @param storageLevel The log storage level.
+ * @return The journal builder.
+ */
+ public Builder<E> withStorageLevel(final StorageLevel storageLevel) {
+ byteJournalBuilder.withStorageLevel(storageLevel);
+ return this;
+ }
- /**
- * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
- * chaining.
- * <p>
- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
- * committed in a given segment.
- *
- * @return The storage builder.
- */
- public Builder<E> withFlushOnCommit() {
- return withFlushOnCommit(true);
- }
+ /**
+ * Sets the journal storage directory.
+ * <p>
+ * The journal will write segment files into the provided directory.
+ *
+ * @param directory The journal storage directory.
+ * @return The journal builder.
+ * @throws NullPointerException If the {@code directory} is {@code null}
+ */
+ public Builder<E> withDirectory(final String directory) {
+ byteJournalBuilder.withDirectory(directory);
+ return this;
+ }
- /**
- * Sets whether to flush buffers to disk when entries are committed to a segment, returning the builder for method
- * chaining.
- * <p>
- * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an entry is
- * committed in a given segment.
- *
- * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
- * @return The storage builder.
- */
- public Builder<E> withFlushOnCommit(boolean flushOnCommit) {
- this.flushOnCommit = flushOnCommit;
- return this;
- }
+ /**
+ * Sets the journal storage directory.
+ * <p>
+ * The journal will write segment files into the provided directory.
+ *
+ * @param directory The journal storage directory.
+ * @return The journal builder.
+ * @throws NullPointerException If the {@code directory} is {@code null}
+ */
+ public Builder<E> withDirectory(final File directory) {
+ byteJournalBuilder.withDirectory(directory);
+ return this;
+ }
- /**
- * Build the {@link SegmentedJournal}.
- *
- * @return A new {@link SegmentedJournal}.
- */
- public SegmentedJournal<E> build() {
- return new SegmentedJournal<>(
- name,
- storageLevel,
- directory,
- namespace,
- maxSegmentSize,
- maxEntrySize,
- maxEntriesPerSegment,
- indexDensity,
- flushOnCommit);
+ /**
+ * Sets the journal namespace.
+ *
+ * @param namespace The journal serializer.
+ * @return The journal builder.
+ * @deprecated due to serialization refactoring, use {@link Builder#withMapper(ByteBufMapper)} instead
+ */
+ @Deprecated(forRemoval = true, since="9.0.3")
+ public Builder<E> withNamespace(final JournalSerdes namespace) {
+ return withMapper(requireNonNull(namespace, "namespace cannot be null").toMapper());
+ }
+
+ /**
+ * Sets journal serializer.
+ *
+ * @param mapper Journal serializer
+ * @return The journal builder
+ */
+ public Builder<E> withMapper(final ByteBufMapper<E> mapper) {
+ this.mapper = requireNonNull(mapper);
+ return this;
+ }
+
+ /**
+ * Sets the maximum segment size in bytes.
+ * <p>
+ * The maximum segment size dictates when journal should roll over to new segments. As entries are written
+ * to a journal segment, once the size of the segment surpasses the configured maximum segment size, the
+ * journal will create a new segment and append new entries to that segment.
+ * <p>
+ * By default, the maximum segment size is 32M.
+ *
+ * @param maxSegmentSize The maximum segment size in bytes.
+ * @return The storage builder.
+ * @throws IllegalArgumentException If the {@code maxSegmentSize} is not positive
+ */
+ public Builder<E> withMaxSegmentSize(final int maxSegmentSize) {
+ byteJournalBuilder.withMaxSegmentSize(maxSegmentSize);
+ return this;
+ }
+
+ /**
+ * Sets the maximum entry size in bytes.
+ *
+ * @param maxEntrySize the maximum entry size in bytes
+ * @return the storage builder
+ * @throws IllegalArgumentException if the {@code maxEntrySize} is not positive
+ */
+ public Builder<E> withMaxEntrySize(final int maxEntrySize) {
+ byteJournalBuilder.withMaxEntrySize(maxEntrySize);
+ return this;
+ }
+
+ /**
+ * Sets the maximum number of entries per segment.
+ *
+ * @param maxEntriesPerSegment The maximum number of entries allowed per segment.
+ * @return The journal builder.
+ * @deprecated since 3.0.2, no longer used
+ */
+ @Deprecated
+ public Builder<E> withMaxEntriesPerSegment(final int maxEntriesPerSegment) {
+ // ignore
+ return this;
+ }
+
+ /**
+ * Sets the journal index density.
+ * <p>
+ * The index density is the frequency at which the position of entries written to the journal will be recorded
+ * in an in-memory index for faster seeking.
+ *
+ * @param indexDensity the index density
+ * @return the journal builder
+ * @throws IllegalArgumentException if the density is not between 0 and 1
+ */
+ public Builder<E> withIndexDensity(final double indexDensity) {
+ byteJournalBuilder.withIndexDensity(indexDensity);
+ return this;
+ }
+
+ /**
+ * Enables flushing buffers to disk when entries are committed to a segment.
+ * <p>
+ * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
+ * entry is committed in a given segment.
+ *
+ * @return The journal builder.
+ */
+ public Builder<E> withFlushOnCommit() {
+ return withFlushOnCommit(true);
+ }
+
+ /**
+ * Enables flushing buffers to disk when entries are committed to a segment.
+ * <p>
+ * When flush-on-commit is enabled, log entry buffers will be automatically flushed to disk each time an
+ * entry is committed in a given segment.
+ *
+ * @param flushOnCommit Whether to flush buffers to disk when entries are committed to a segment.
+ * @return The journal builder.
+ */
+ public Builder<E> withFlushOnCommit(final boolean flushOnCommit) {
+ byteJournalBuilder.withFlushOnCommit(flushOnCommit);
+ return this;
+ }
+
+ /**
+ * Build the {@link SegmentedJournal}.
+ *
+ * @return {@link SegmentedJournal} instance.
+ */
+ public SegmentedJournal<E> build() {
+ return new SegmentedJournal<>(byteJournalBuilder.build(), mapper);
+ }
}
- }
}
/*
* Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
- * Copyright (c) 2024 PANTHEON.tech, s.r.o.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import static java.util.Objects.requireNonNull;
-import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
/**
- * A {@link JournalReader} traversing all entries.
+ * A {@link JournalReader} backed by a {@link ByteBufReader}.
*/
-sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
- // Marker non-null object for tryAdvance()
- private static final @NonNull Object ADVANCED = new Object();
+final class SegmentedJournalReader<E> implements JournalReader<E> {
+ private final ByteBufMapper<E> mapper;
+ private final ByteBufReader reader;
- final SegmentedJournal<E> journal;
-
- private JournalSegment currentSegment;
- private JournalSegmentReader currentReader;
- private long nextIndex;
-
- SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
- this.journal = requireNonNull(journal);
- currentSegment = requireNonNull(segment);
- currentReader = segment.createReader();
- nextIndex = currentSegment.firstIndex();
+ SegmentedJournalReader(final ByteBufReader reader, final ByteBufMapper<E> mapper) {
+ this.reader = requireNonNull(reader);
+ this.mapper = requireNonNull(mapper);
}
@Override
- public final long getFirstIndex() {
- return journal.getFirstSegment().firstIndex();
+ public long getFirstIndex() {
+ return reader.firstIndex();
}
@Override
- public final long getNextIndex() {
- return nextIndex;
+ public long getNextIndex() {
+ return reader.nextIndex();
}
@Override
- public final void reset() {
- currentReader.close();
-
- currentSegment = journal.getFirstSegment();
- currentReader = currentSegment.createReader();
- nextIndex = currentSegment.firstIndex();
+ public void reset() {
+ reader.reset();
}
@Override
- public final void reset(final long index) {
- // If the current segment is not open, it has been replaced. Reset the segments.
- if (!currentSegment.isOpen()) {
- reset();
- }
-
- if (index < nextIndex) {
- rewind(index);
- } else if (index > nextIndex) {
- while (index > nextIndex && tryAdvance()) {
- // Nothing else
- }
- } else {
- resetCurrentReader(index);
- }
- }
-
- private void resetCurrentReader(final long index) {
- final var position = currentSegment.lookup(index - 1);
- if (position != null) {
- nextIndex = position.index();
- currentReader.setPosition(position.position());
- } else {
- nextIndex = currentSegment.firstIndex();
- currentReader.setPosition(JournalSegmentDescriptor.BYTES);
- }
- while (nextIndex < index && tryAdvance()) {
- // Nothing else
- }
- }
-
- /**
- * Rewinds the journal to the given index.
- */
- private void rewind(final long index) {
- if (currentSegment.firstIndex() >= index) {
- JournalSegment segment = journal.getSegment(index - 1);
- if (segment != null) {
- currentReader.close();
-
- currentSegment = segment;
- currentReader = currentSegment.createReader();
- }
- }
-
- resetCurrentReader(index);
+ public void reset(final long index) {
+ reader.reset(index);
}
@Override
- public <T> T tryNext(final EntryMapper<E, T> mapper) {
- final var index = nextIndex;
- var buf = currentReader.readBytes(index);
- if (buf == null) {
- final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
- if (nextSegment == null || nextSegment.firstIndex() != index) {
- return null;
- }
-
- currentReader.close();
-
- currentSegment = nextSegment;
- currentReader = currentSegment.createReader();
- buf = currentReader.readBytes(index);
- if (buf == null) {
- return null;
- }
- }
-
- final var entry = journal.serializer().deserialize(buf);
- final var ret = requireNonNull(mapper.mapEntry(index, entry, buf.readableBytes()));
- nextIndex = index + 1;
- return ret;
- }
-
- /**
- * Try to move to the next entry.
- *
- * @return {@code true} if there was a next entry and this reader has moved to it
- */
- final boolean tryAdvance() {
- return tryNext((index, entry, size) -> ADVANCED) != null;
+ public <T> @Nullable T tryNext(final EntryMapper<E, T> entryMapper) {
+ return reader.tryNext(
+ (index, buf) -> requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(buf), buf.readableBytes()))
+ );
}
@Override
- public final void close() {
- currentReader.close();
- journal.closeReader(this);
+ public void close() {
+ reader.close();
}
}
/*
* Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
package io.atomix.storage.journal;
-import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
/**
- * Raft log writer.
+ * A {@link JournalWriter} backed by a {@link ByteBufWriter}.
*/
final class SegmentedJournalWriter<E> implements JournalWriter<E> {
- private final SegmentedJournal<E> journal;
- private JournalSegment currentSegment;
- private JournalSegmentWriter currentWriter;
+ private final ByteBufMapper<E> mapper;
+ private final ByteBufWriter writer;
- SegmentedJournalWriter(SegmentedJournal<E> journal) {
- this.journal = journal;
- this.currentSegment = journal.getLastSegment();
- this.currentWriter = currentSegment.acquireWriter();
- }
-
- @Override
- public long getLastIndex() {
- return currentWriter.getLastIndex();
- }
-
- @Override
- public long getNextIndex() {
- return currentWriter.getNextIndex();
- }
-
- @Override
- public void reset(long index) {
- if (index > currentSegment.firstIndex()) {
- currentSegment.releaseWriter();
- currentSegment = journal.resetSegments(index);
- currentWriter = currentSegment.acquireWriter();
- } else {
- truncate(index - 1);
+ SegmentedJournalWriter(final ByteBufWriter writer, final ByteBufMapper<E> mapper) {
+ this.writer = requireNonNull(writer);
+ this.mapper = requireNonNull(mapper);
}
- journal.resetHead(index);
- }
- @Override
- public void commit(long index) {
- if (index > journal.getCommitIndex()) {
- journal.setCommitIndex(index);
- if (journal.isFlushOnCommit()) {
- flush();
- }
+ @Override
+ public long getLastIndex() {
+ return writer.lastIndex();
}
- }
- @Override
- public <T extends E> Indexed<T> append(T entry) {
- final var bytes = journal.serializer().serialize(entry);
- var index = currentWriter.append(bytes);
- if (index != null) {
- return new Indexed<>(index, entry, bytes.readableBytes());
+ @Override
+ public long getNextIndex() {
+ return writer.nextIndex();
}
- // Slow path: we do not have enough capacity
- currentWriter.flush();
- currentSegment.releaseWriter();
- currentSegment = journal.getNextSegment();
- currentWriter = currentSegment.acquireWriter();
- final var newIndex = verifyNotNull(currentWriter.append(bytes));
- return new Indexed<>(newIndex, entry, bytes.readableBytes());
- }
-
- @Override
- public void truncate(long index) {
- if (index < journal.getCommitIndex()) {
- throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
+ @Override
+ public void reset(final long index) {
+ writer.reset(index);
}
- // Delete all segments with first indexes greater than the given index.
- while (index < currentSegment.firstIndex() && currentSegment != journal.getFirstSegment()) {
- currentSegment.releaseWriter();
- journal.removeSegment(currentSegment);
- currentSegment = journal.getLastSegment();
- currentWriter = currentSegment.acquireWriter();
+ @Override
+ public void commit(final long index) {
+ writer.commit(index);
}
- // Truncate the current index.
- currentWriter.truncate(index);
+ @Override
+ public <T extends E> Indexed<T> append(final T entry) {
+ final var buf = mapper.objectToBytes(entry);
+ return new Indexed<>(writer.append(buf), entry, buf.readableBytes());
+ }
- // Reset segment readers.
- journal.resetTail(index + 1);
- }
+ @Override
+ public void truncate(final long index) {
+ writer.truncate(index);
+ }
- @Override
- public void flush() {
- currentWriter.flush();
- }
+ @Override
+ public void flush() {
+ writer.flush();
+ }
}