--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import java.nio.ByteBuffer;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * {@link FileAccess} for {@link StorageLevel#DISK}.
+ */
+@NonNullByDefault
+final class DiskFileAccess extends FileAccess {
+ /**
+ * Just do not bother with IO smaller than this many bytes.
+ */
+ private static final int MIN_IO_SIZE = 8192;
+
+ DiskFileAccess(final JournalSegmentFile file, final int maxEntrySize) {
+ super(file, maxEntrySize);
+ }
+
+ @Override
+ DiskFileReader newFileReader() {
+ return new DiskFileReader(file, allocateBuffer(maxEntrySize, file.maxSize()));
+ }
+
+ @Override
+ DiskFileWriter newFileWriter() {
+ return new DiskFileWriter(file, maxEntrySize, allocateBuffer(maxEntrySize, file.maxSize()));
+ }
+
+ @Override
+ public void close() {
+ // No-op
+ }
+
+ private static ByteBuffer allocateBuffer(final int maxEntrySize, final int maxSegmentSize) {
+ return ByteBuffer.allocate(chooseBufferSize(maxEntrySize, maxSegmentSize));
+ }
+
+ private static int chooseBufferSize(final int maxEntrySize, final int maxSegmentSize) {
+ if (maxSegmentSize <= MIN_IO_SIZE) {
+ // just buffer the entire segment
+ return maxSegmentSize;
+ }
+
+ // one full entry plus its header, or MIN_IO_SIZE, which benefits the read of many small entries
+ final int minBufferSize = maxEntrySize + SegmentEntry.HEADER_BYTES;
+ return minBufferSize <= MIN_IO_SIZE ? MIN_IO_SIZE : minBufferSize;
+ }
+}
// tracks where memory's first available byte maps to in terms of FileChannel.position()
private int bufferPosition;
- DiskFileReader(final JournalSegmentFile file, final int maxEntrySize) {
- this(file, file.allocateBuffer(maxEntrySize));
- }
-
// Note: take ownership of the buffer
DiskFileReader(final JournalSegmentFile file, final ByteBuffer buffer) {
super(file);
package io.atomix.storage.journal;
import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
+import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
private final FileChannel channel;
private final ByteBuffer buffer;
- DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize) {
+ DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize, final ByteBuffer buffer) {
super(file, maxEntrySize);
+ this.buffer = requireNonNull(buffer);
channel = file.channel();
- buffer = file.allocateBuffer(maxEntrySize);
reader = new DiskFileReader(file, buffer);
}
return reader;
}
- @Override
- MappedByteBuffer buffer() {
- return null;
- }
-
- @Override
- MappedFileWriter toMapped() {
- flush();
- return new MappedFileWriter(file, maxEntrySize);
- }
-
- @Override
- DiskFileWriter toDisk() {
- return null;
- }
-
@Override
void writeEmptyHeader(final int position) {
try {
}
}
}
-
- @Override
- void close() {
- flush();
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * Abstract base class for accessing a particular file.
+ */
+@NonNullByDefault
+abstract sealed class FileAccess implements AutoCloseable permits DiskFileAccess, MappedFileAccess {
+ final JournalSegmentFile file;
+ final int maxEntrySize;
+
+ FileAccess(final JournalSegmentFile file, final int maxEntrySize) {
+ this.file = requireNonNull(file);
+ this.maxEntrySize = maxEntrySize;
+ }
+
+ /**
+ * Create a new {@link FileReader}.
+ *
+ * @return a new {@link FileReader}
+ */
+ abstract FileReader newFileReader();
+
+ /**
+ * Create a new {@link FileWriter}.
+ *
+ * @return a new {@link FileWriter}
+ */
+ abstract FileWriter newFileWriter();
+
+ @Override
+ public abstract void close();
+}
import static java.util.Objects.requireNonNull;
import com.google.common.base.MoreObjects;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import org.eclipse.jdt.annotation.Nullable;
/**
* An abstraction over how to write a {@link JournalSegmentFile}.
*/
abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter {
- final JournalSegmentFile file;
- final int maxEntrySize;
+ private final JournalSegmentFile file;
+ private final int maxEntrySize;
FileWriter(final JournalSegmentFile file, final int maxEntrySize) {
this.file = requireNonNull(file);
this.maxEntrySize = maxEntrySize;
}
+ final JournalSegmentFile file() {
+ return file;
+ }
+
+ final int maxEntrySize() {
+ return maxEntrySize;
+ }
+
/**
* Return the internal {@link FileReader}.
*
/**
* Flushes written entries to disk.
*/
- abstract void flush();
-
- /**
- * Closes this writer.
- */
- abstract void close();
+ abstract void flush() throws IOException;
@Override
public final String toString() {
return MoreObjects.toStringHelper(this).add("path", file.path()).toString();
}
-
- abstract @Nullable MappedByteBuffer buffer();
-
- abstract @Nullable MappedFileWriter toMapped();
-
- abstract @Nullable DiskFileWriter toDisk();
}
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
final class JournalSegment {
+ /**
+ * Encapsulation of a {@link JournalSegment}'s state;
+ */
+ sealed interface State {
+ // Marker interface
+ }
+
+ /**
+ * Journal segment is active, i.e. there is a associated with it.
+ */
+ @NonNullByDefault
+ record Active(FileAccess access, JournalSegmentWriter writer) implements State {
+ Active {
+ requireNonNull(access);
+ requireNonNull(writer);
+ }
+
+ Inactive deactivate() {
+ final var inactive = new Inactive(writer.currentPosition());
+ access.close();
+ return inactive;
+ }
+ }
+
+ /**
+ * Journal segment is inactive, i.e. there is no writer associated with it.
+ */
+ @NonNullByDefault
+ record Inactive(int position) implements State {
+ Active activate(final JournalSegment segment) throws IOException {
+ final var access = segment.file.newAccess(segment.storageLevel, segment.maxEntrySize);
+ return new Active(access, new JournalSegmentWriter(access.newFileWriter(), segment, segment.journalIndex,
+ this));
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
private final AtomicInteger references = new AtomicInteger();
- private final JournalSegmentFile file;
- private final StorageLevel storageLevel;
+ private final @NonNull JournalSegmentFile file;
+ private final @NonNull StorageLevel storageLevel;
+ private final @NonNull JournalIndex journalIndex;
private final int maxEntrySize;
- private final JournalIndex journalIndex;
- private JournalSegmentWriter writer;
+ private State state;
private boolean open = true;
JournalSegment(
this.file = requireNonNull(file);
this.storageLevel = requireNonNull(storageLevel);
this.maxEntrySize = maxEntrySize;
- journalIndex = new SparseJournalIndex(indexDensity);
- final var fileWriter = switch (storageLevel) {
- case DISK -> new DiskFileWriter(file, maxEntrySize);
- case MAPPED -> new MappedFileWriter(file, maxEntrySize);
- };
+ journalIndex = new SparseJournalIndex(indexDensity);
- // Traverse all entries and push them to index -- thus reconstructing both last index and current position
- writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex,
- indexEntries(fileWriter, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null))
- // relinquish mapped memory
- .toFileChannel();
+ try (var tmpAccess = file.newAccess(storageLevel, maxEntrySize)) {
+ final var fileReader = tmpAccess.newFileReader();
+ state = new Inactive(indexEntries(fileReader, this, maxEntrySize, journalIndex, Long.MAX_VALUE, null));
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
}
/**
/**
* Acquires a reference to the log segment.
*/
- private void acquire() {
- if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
- writer = writer.toMapped();
+ private Active acquire() {
+ return references.getAndIncrement() == 0 ? activate() : (Active) state;
+ }
+
+ private Active activate() {
+ final Active ret;
+ try {
+ ret = ((Inactive) state).activate(this);
+ } catch (IOException e) {
+ throw new StorageException(e);
}
+ state = ret;
+ return ret;
}
/**
*/
private void release() {
if (references.decrementAndGet() == 0) {
- if (storageLevel == StorageLevel.MAPPED) {
- writer = writer.toFileChannel();
- }
+ state = ((Active) state).deactivate();
if (!open) {
finishClose();
}
*/
JournalSegmentWriter acquireWriter() {
checkOpen();
- acquire();
-
- return writer;
+ return acquire().writer();
}
/**
*/
JournalSegmentReader createReader() {
checkOpen();
- acquire();
- final var buffer = writer.buffer();
- final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
- : new DiskFileReader(file, maxEntrySize);
- final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
+ final var reader = new JournalSegmentReader(this, acquire().access().newFileReader(), maxEntrySize);
reader.setPosition(JournalSegmentDescriptor.BYTES);
readers.add(reader);
return reader;
}
private void finishClose() {
- writer.close();
try {
file.close();
} catch (IOException e) {
.toString();
}
- static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
- final JournalIndex journalIndex, final long maxNextIndex, final @Nullable Position start) {
+ static int indexEntries(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+ final long maxNextIndex, final @Nullable Position start) {
// acquire ownership of cache and make sure reader does not see anything we've done once we're done
final var fileReader = fileWriter.reader();
try {
- return indexEntries(fileReader, segment, maxEntrySize, journalIndex, maxNextIndex, start);
+ return indexEntries(fileReader, segment, fileWriter.maxEntrySize(), journalIndex, maxNextIndex, start);
} finally {
// Make sure reader does not see anything we've done
fileReader.invalidateCache();
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.Path;
private static final char PART_SEPARATOR = '-';
private static final char EXTENSION_SEPARATOR = '.';
private static final String EXTENSION = "log";
- /**
- * Just do not bother with IO smaller than this many bytes.
- */
- private static final int MIN_IO_SIZE = 8192;
private final @NonNull JournalSegmentDescriptor descriptor;
private final @NonNull Path path;
}
/**
- * Map the contents of the file into memory.
+ * Access this file using specified {@link StorageLevel} and maximum entry size.
*
- * @return A {@link MappedByteBuffer}
+ * @param level a {@link StorageLevel}
+ * @param maxEntrySize maximum size of stored entry
+ * @return A {@link MappedFileAccess}
* @throws IOException if an I/O error occurs
*/
- @NonNull MappedByteBuffer map() throws IOException {
- return channel().map(MapMode.READ_WRITE, 0, maxSize());
+ @NonNull FileAccess newAccess(final StorageLevel level, final int maxEntrySize) throws IOException {
+ return switch (level) {
+ case DISK -> new DiskFileAccess(this, maxEntrySize);
+ case MAPPED -> new MappedFileAccess(this, maxEntrySize, channel().map(MapMode.READ_WRITE, 0, maxSize()));
+ };
}
void close() throws IOException {
file.close();
}
- ByteBuffer allocateBuffer(final int maxEntrySize) {
- return ByteBuffer.allocate(chooseBufferSize(maxEntrySize));
- }
-
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString();
}
- private int chooseBufferSize(final int maxEntrySize) {
- final int maxSegmentSize = maxSize();
- if (maxSegmentSize <= MIN_IO_SIZE) {
- // just buffer the entire segment
- return maxSegmentSize;
- }
-
- // one full entry plus its header, or MIN_IO_SIZE, which benefits the read of many small entries
- final int minBufferSize = maxEntrySize + SegmentEntry.HEADER_BYTES;
- return minBufferSize <= MIN_IO_SIZE ? MIN_IO_SIZE : minBufferSize;
- }
-
/**
* Returns a boolean value indicating whether the given file appears to be a parsable segment file.
*
import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
+import io.atomix.storage.journal.JournalSegment.Inactive;
import io.atomix.storage.journal.StorageException.TooLarge;
import io.atomix.storage.journal.index.JournalIndex;
import io.netty.buffer.Unpooled;
import java.io.EOFException;
import java.io.IOException;
-import java.nio.MappedByteBuffer;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
private final FileWriter fileWriter;
final @NonNull JournalSegment segment;
private final @NonNull JournalIndex journalIndex;
- final int maxSegmentSize;
- final int maxEntrySize;
private int currentPosition;
- JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
- final JournalIndex journalIndex, final int currentPosition) {
+ JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+ final int currentPosition) {
this.fileWriter = requireNonNull(fileWriter);
this.segment = requireNonNull(segment);
this.journalIndex = requireNonNull(journalIndex);
- this.maxEntrySize = maxEntrySize;
this.currentPosition = currentPosition;
- maxSegmentSize = segment.file().maxSize();
}
- JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
- segment = previous.segment;
- journalIndex = previous.journalIndex;
- maxSegmentSize = previous.maxSegmentSize;
- maxEntrySize = previous.maxEntrySize;
- currentPosition = previous.currentPosition;
+ JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+ final Inactive segmentState) {
this.fileWriter = requireNonNull(fileWriter);
+ this.segment = requireNonNull(segment);
+ this.journalIndex = requireNonNull(journalIndex);
+ currentPosition = segmentState.position();
+ }
+
+ int currentPosition() {
+ return currentPosition;
}
/**
// Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are
// way smaller than that.
final int bodyPosition = position + HEADER_BYTES;
- final int avail = maxSegmentSize - bodyPosition;
+ final int avail = segment.file().maxSize() - bodyPosition;
if (avail <= 0) {
// we do not have enough space for the header and a byte: signal a retry
LOG.trace("Not enough space for {} at {}", index, position);
}
// Entry must not exceed maxEntrySize
+ final var maxEntrySize = fileWriter.maxEntrySize();
final var writeLimit = Math.min(avail, maxEntrySize);
// Allocate entry space
currentPosition = index < segment.firstIndex() ? JournalSegmentDescriptor.BYTES
// recover position and last written
- : JournalSegment.indexEntries(fileWriter, segment, maxEntrySize, journalIndex, index, nearest);
+ : JournalSegment.indexEntries(fileWriter, segment, journalIndex, index, nearest);
// Zero the entry header at current channel position.
fileWriter.writeEmptyHeader(currentPosition);
* Flushes written entries to disk.
*/
void flush() {
- fileWriter.flush();
- }
-
- /**
- * Closes this writer.
- */
- void close() {
- fileWriter.close();
- }
-
- /**
- * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
- * buffer.
- *
- * @return the mapped buffer underlying the segment writer, or {@code null}.
- */
- @Nullable MappedByteBuffer buffer() {
- return fileWriter.buffer();
- }
-
- @NonNull JournalSegmentWriter toMapped() {
- final var newWriter = fileWriter.toMapped();
- return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
- }
-
- @NonNull JournalSegmentWriter toFileChannel() {
- final var newWriter = fileWriter.toDisk();
- return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
+ try {
+ fileWriter.flush();
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.util.internal.PlatformDependent;
+import java.io.UncheckedIOException;
+import java.nio.MappedByteBuffer;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * {@link FileAccess} for {@link StorageLevel#MAPPED}.
+ */
+@NonNullByDefault
+final class MappedFileAccess extends FileAccess {
+ private final MappedByteBuffer mappedBuffer;
+
+ MappedFileAccess(final JournalSegmentFile file, final int maxEntrySize, final MappedByteBuffer mappedBuffer) {
+ super(file, maxEntrySize);
+ this.mappedBuffer = requireNonNull(mappedBuffer);
+ }
+
+ @Override
+ MappedFileReader newFileReader() {
+ return new MappedFileReader(file, mappedBuffer.slice());
+ }
+
+ @Override
+ MappedFileWriter newFileWriter() {
+ return new MappedFileWriter(file, maxEntrySize, mappedBuffer.slice(), () -> {
+ try {
+ mappedBuffer.force();
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ PlatformDependent.freeDirectBuffer(mappedBuffer);
+ }
+}
MappedFileReader(final JournalSegmentFile file, final ByteBuffer buffer) {
super(file);
- this.buffer = buffer.slice().asReadOnlyBuffer();
+ this.buffer = buffer.asReadOnlyBuffer();
}
@Override
*/
package io.atomix.storage.journal;
-import io.netty.util.internal.PlatformDependent;
+import static java.util.Objects.requireNonNull;
+
+import java.io.Flushable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import org.eclipse.jdt.annotation.NonNull;
/**
* A {@link StorageLevel#MAPPED} {@link FileWriter}.
*/
final class MappedFileWriter extends FileWriter {
- private final @NonNull MappedByteBuffer mappedBuffer;
private final MappedFileReader reader;
private final ByteBuffer buffer;
+ private final Flushable flush;
- MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize) {
+ MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize, final ByteBuffer buffer,
+ final Flushable flush) {
super(file, maxEntrySize);
-
- try {
- mappedBuffer = file.map();
- } catch (IOException e) {
- throw new StorageException(e);
- }
- buffer = mappedBuffer.slice();
- reader = new MappedFileReader(file, mappedBuffer);
+ this.buffer = requireNonNull(buffer);
+ this.flush = requireNonNull(flush);
+ reader = new MappedFileReader(file, buffer);
}
@Override
return reader;
}
- @Override
- MappedByteBuffer buffer() {
- return mappedBuffer;
- }
-
- @Override
- MappedFileWriter toMapped() {
- return null;
- }
-
- @Override
- DiskFileWriter toDisk() {
- close();
- return new DiskFileWriter(file, maxEntrySize);
- }
-
@Override
void writeEmptyHeader(final int position) {
// Note: we issue a single putLong() instead of two putInt()s.
}
@Override
- void flush() {
- mappedBuffer.force();
- }
-
- @Override
- void close() {
- flush();
- PlatformDependent.freeDirectBuffer(mappedBuffer);
+ void flush() throws IOException {
+ flush.flush();
}
}