import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.nio.file.Path;
import org.eclipse.jdt.annotation.NonNull;
/**
// tracks where memory's first available byte maps to in terms of FileChannel.position()
private int bufferPosition;
- DiskFileReader(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
- this(path, channel, allocateBuffer(maxSegmentSize, maxEntrySize));
+ DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+ this(file, channel, allocateBuffer(file.maxSize(), maxEntrySize));
}
// Note: take ownership of the buffer
- DiskFileReader(final Path path, final FileChannel channel, final ByteBuffer buffer) {
- super(path);
+ DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final ByteBuffer buffer) {
+ super(file);
this.channel = requireNonNull(channel);
this.buffer = buffer.flip();
bufferPosition = 0;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import java.nio.file.Path;
/**
* A {@link StorageLevel#DISK} {@link FileWriter}.
private final DiskFileReader reader;
private final ByteBuffer buffer;
- DiskFileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
- super(path, channel, maxSegmentSize, maxEntrySize);
- buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
- reader = new DiskFileReader(path, channel, buffer);
+ DiskFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+ super(file, channel, maxEntrySize);
+ buffer = DiskFileReader.allocateBuffer(file.maxSize(), maxEntrySize);
+ reader = new DiskFileReader(file, channel, buffer);
}
@Override
@Override
MappedFileWriter toMapped() {
flush();
- return new MappedFileWriter(path, channel, maxSegmentSize, maxEntrySize);
+ return new MappedFileWriter(file, channel, maxEntrySize);
}
@Override
import com.google.common.base.MoreObjects;
import java.nio.ByteBuffer;
-import java.nio.file.Path;
import org.eclipse.jdt.annotation.NonNull;
/**
* An abstraction over how to read a {@link JournalSegmentFile}.
*/
abstract sealed class FileReader permits DiskFileReader, MappedFileReader {
- private final Path path;
+ private final JournalSegmentFile file;
- FileReader(final Path path) {
- this.path = requireNonNull(path);
+ FileReader(final JournalSegmentFile file) {
+ this.file = requireNonNull(file);
}
/**
@Override
public final String toString() {
- return MoreObjects.toStringHelper(this).add("path", path).toString();
+ return MoreObjects.toStringHelper(this).add("path", file.path()).toString();
}
}
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import java.nio.file.Path;
import org.eclipse.jdt.annotation.Nullable;
/**
* An abstraction over how to write a {@link JournalSegmentFile}.
*/
abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter {
- final Path path;
+ final JournalSegmentFile file;
final FileChannel channel;
- final int maxSegmentSize;
final int maxEntrySize;
- FileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
- this.path = requireNonNull(path);
+ FileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+ this.file = requireNonNull(file);
this.channel = requireNonNull(channel);
- this.maxSegmentSize = maxSegmentSize;
this.maxEntrySize = maxEntrySize;
}
@Override
public final String toString() {
- return MoreObjects.toStringHelper(this).add("path", path).toString();
+ return MoreObjects.toStringHelper(this).add("path", file.path()).toString();
}
abstract @Nullable MappedByteBuffer buffer();
*/
package io.atomix.storage.journal;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.MoreObjects;
import io.atomix.storage.journal.index.JournalIndex;
import io.atomix.storage.journal.index.Position;
*/
final class JournalSegment implements AutoCloseable {
private final JournalSegmentFile file;
- private final JournalSegmentDescriptor descriptor;
private final StorageLevel storageLevel;
private final int maxEntrySize;
private final JournalIndex journalIndex;
JournalSegment(
final JournalSegmentFile file,
- final JournalSegmentDescriptor descriptor,
final StorageLevel storageLevel,
final int maxEntrySize,
final double indexDensity) {
- this.file = file;
- this.descriptor = descriptor;
- this.storageLevel = storageLevel;
+ this.file = requireNonNull(file);
+ this.storageLevel = requireNonNull(storageLevel);
this.maxEntrySize = maxEntrySize;
journalIndex = new SparseJournalIndex(indexDensity);
try {
- channel = FileChannel.open(file.file().toPath(),
+ channel = FileChannel.open(file.path(),
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
} catch (IOException e) {
throw new StorageException(e);
}
final var fileWriter = switch (storageLevel) {
- case DISK -> new DiskFileWriter(file.file().toPath(), channel, descriptor.maxSegmentSize(), maxEntrySize);
- case MAPPED -> new MappedFileWriter(file.file().toPath(), channel, descriptor.maxSegmentSize(), maxEntrySize);
+ case DISK -> new DiskFileWriter(file, channel, maxEntrySize);
+ case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize);
};
writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
// relinquish mapped memory
* @return The segment's starting index.
*/
long firstIndex() {
- return descriptor.index();
+ return file.descriptor().index();
}
/**
return file;
}
- /**
- * Returns the segment descriptor.
- *
- * @return The segment descriptor.
- */
- JournalSegmentDescriptor descriptor() {
- return descriptor;
- }
-
/**
* Looks up the position of the given index.
*
acquire();
final var buffer = writer.buffer();
- final var path = file.file().toPath();
- final var fileReader = buffer != null ? new MappedFileReader(path, buffer)
- : new DiskFileReader(path, channel, descriptor.maxSegmentSize(), maxEntrySize);
+ final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
+ : new DiskFileReader(file, channel, maxEntrySize);
final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
reader.setPosition(JournalSegmentDescriptor.BYTES);
readers.add(reader);
*/
void delete() {
try {
- Files.deleteIfExists(file.file().toPath());
+ Files.deleteIfExists(file.path());
} catch (IOException e) {
throw new StorageException(e);
}
@Override
public String toString() {
+ final var descriptor = file.descriptor();
return MoreObjects.toStringHelper(this)
.add("id", descriptor.id())
.add("version", descriptor.version())
- .add("index", firstIndex())
+ .add("index", descriptor.index())
.toString();
}
}
import static java.util.Objects.requireNonNull;
+import com.google.common.base.MoreObjects;
import java.io.File;
+import java.nio.file.Path;
+import org.eclipse.jdt.annotation.NonNull;
/**
* Segment file utility.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
-public final class JournalSegmentFile {
+final class JournalSegmentFile {
private static final char PART_SEPARATOR = '-';
private static final char EXTENSION_SEPARATOR = '.';
private static final String EXTENSION = "log";
- private final File file;
+ private final @NonNull JournalSegmentDescriptor descriptor;
+ private final @NonNull Path path;
- /**
- * @throws IllegalArgumentException if {@code file} is not a valid segment file
- */
- JournalSegmentFile(final File file) {
- this.file = file;
+ JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor) {
+ this.path = requireNonNull(path);
+ this.descriptor = requireNonNull(descriptor);
}
/**
*
* @return The segment file.
*/
- public File file() {
- return file;
+ @NonNull Path path() {
+ return path;
+ }
+
+ /**
+ * Returns the segment descriptor.
+ *
+ * @return The segment descriptor.
+ */
+ @NonNull JournalSegmentDescriptor descriptor() {
+ return descriptor;
+ }
+
+ int maxSize() {
+ return descriptor.maxSegmentSize();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString();
}
/**
JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) {
this.segment = requireNonNull(segment);
this.fileReader = requireNonNull(fileReader);
- maxSegmentSize = segment.descriptor().maxSegmentSize();
+ maxSegmentSize = segment.file().maxSize();
this.maxEntrySize = maxEntrySize;
}
this.fileWriter = requireNonNull(fileWriter);
this.segment = requireNonNull(segment);
this.index = requireNonNull(index);
- maxSegmentSize = segment.descriptor().maxSegmentSize();
+ maxSegmentSize = segment.file().maxSize();
this.maxEntrySize = maxEntrySize;
// adjust lastEntry value
reset(0);
package io.atomix.storage.journal;
import java.nio.ByteBuffer;
-import java.nio.file.Path;
/**
* A {@link StorageLevel#MAPPED} implementation of {@link FileReader}. Operates on direct mapping of the entire file.
final class MappedFileReader extends FileReader {
private final ByteBuffer buffer;
- MappedFileReader(final Path path, final ByteBuffer buffer) {
- super(path);
+ MappedFileReader(final JournalSegmentFile file, final ByteBuffer buffer) {
+ super(file);
this.buffer = buffer.slice().asReadOnlyBuffer();
}
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import java.nio.file.Path;
import org.eclipse.jdt.annotation.NonNull;
/**
private final MappedFileReader reader;
private final ByteBuffer buffer;
- MappedFileWriter(final Path path, final FileChannel channel, final int maxSegmentSize, final int maxEntrySize) {
- super(path, channel, maxSegmentSize, maxEntrySize);
+ MappedFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+ super(file, channel, maxEntrySize);
- mappedBuffer = mapBuffer(channel, maxSegmentSize);
+ mappedBuffer = mapBuffer(channel, file.maxSize());
buffer = mappedBuffer.slice();
- reader = new MappedFileReader(path, mappedBuffer);
+ reader = new MappedFileReader(file, mappedBuffer);
}
private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
@Override
DiskFileWriter toDisk() {
close();
- return new DiskFileWriter(path, channel, maxSegmentSize, maxEntrySize);
+ return new DiskFileWriter(file, channel, maxEntrySize);
}
@Override
*/
private synchronized void open() {
// Load existing log segments from disk.
- for (JournalSegment segment : loadSegments()) {
- segments.put(segment.descriptor().index(), segment);
+ for (var segment : loadSegments()) {
+ segments.put(segment.firstIndex(), segment);
}
// If a segment doesn't already exist, create an initial segment starting at index 1.
final var index = currentSegment.lastIndex() + 1;
final var lastSegment = getLastSegment();
- currentSegment = createSegment(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1, index);
+ currentSegment = createSegment(lastSegment != null ? lastSegment.file().descriptor().id() + 1 : 1, index);
segments.put(index, currentSegment);
return currentSegment;
}
throw new StorageException(e);
}
- final var segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
+ final var segment = newSegment(new JournalSegmentFile(segmentFile.toPath(), descriptor));
LOG.debug("Created segment: {}", segment);
return segment;
}
* Creates a new segment instance.
*
* @param segmentFile The segment file.
- * @param descriptor The segment descriptor.
* @return The segment instance.
*/
- protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
- return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
+ protected JournalSegment newSegment(JournalSegmentFile segmentFile) {
+ return new JournalSegment(segmentFile, storageLevel, maxEntrySize, indexDensity);
}
/**
// If the file looks like a segment file, attempt to load the segment.
if (JournalSegmentFile.isSegmentFile(name, file)) {
+ final var path = file.toPath();
// read the descriptor
final JournalSegmentDescriptor descriptor;
try {
- descriptor = JournalSegmentDescriptor.readFrom(file.toPath());
+ descriptor = JournalSegmentDescriptor.readFrom(path);
} catch (IOException e) {
throw new StorageException(e);
}
// Load the segment.
- final var segmentFile = new JournalSegmentFile(file);
- final var segment = newSegment(segmentFile, descriptor);
- LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName());
+ final var segment = newSegment(new JournalSegmentFile(path, descriptor));
+ LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), path);
// Add the segment to the segments list.
segments.put(segment.firstIndex(), segment);
while (iterator.hasNext()) {
final var segment = iterator.next().getValue();
if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
- LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(),
- previousSegment.file().file());
+ LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().path(),
+ previousSegment.file().path());
corrupted = true;
}
if (corrupted) {