Having a RandomAccessFile is nice, as it internally holds a FileChannel.
This makes our files stateful -- which is okay, as we still manage their
lifecycle via JournalSegment.
JIRA: CONTROLLER-2099
Change-Id: Id8305c74dbd881eaf52d84191c11bb4ea2bc164b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
package io.atomix.storage.journal;
import static com.google.common.base.Verify.verify;
-import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.nio.ByteBuffer;
// tracks where memory's first available byte maps to in terms of FileChannel.position()
private int bufferPosition;
- DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
- this(file, channel, allocateBuffer(file.maxSize(), maxEntrySize));
+ DiskFileReader(final JournalSegmentFile file, final int maxEntrySize) {
+ this(file, allocateBuffer(file.maxSize(), maxEntrySize));
}
// Note: take ownership of the buffer
- DiskFileReader(final JournalSegmentFile file, final FileChannel channel, final ByteBuffer buffer) {
+ DiskFileReader(final JournalSegmentFile file, final ByteBuffer buffer) {
super(file);
- this.channel = requireNonNull(channel);
+ channel = file.channel();
this.buffer = buffer.flip();
bufferPosition = 0;
}
private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
private final DiskFileReader reader;
+ private final FileChannel channel;
private final ByteBuffer buffer;
- DiskFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
- super(file, channel, maxEntrySize);
+ DiskFileWriter(final JournalSegmentFile file, final int maxEntrySize) {
+ super(file, maxEntrySize);
+ channel = file.channel();
buffer = DiskFileReader.allocateBuffer(file.maxSize(), maxEntrySize);
- reader = new DiskFileReader(file, channel, buffer);
+ reader = new DiskFileReader(file, buffer);
}
@Override
@Override
MappedFileWriter toMapped() {
flush();
- return new MappedFileWriter(file, channel, maxEntrySize);
+ return new MappedFileWriter(file, maxEntrySize);
}
@Override
* An abstraction over how to read a {@link JournalSegmentFile}.
*/
abstract sealed class FileReader permits DiskFileReader, MappedFileReader {
- private final JournalSegmentFile file;
+ private final @NonNull JournalSegmentFile file;
FileReader(final JournalSegmentFile file) {
this.file = requireNonNull(file);
import com.google.common.base.MoreObjects;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
import org.eclipse.jdt.annotation.Nullable;
/**
*/
abstract sealed class FileWriter permits DiskFileWriter, MappedFileWriter {
final JournalSegmentFile file;
- final FileChannel channel;
final int maxEntrySize;
- FileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
+ FileWriter(final JournalSegmentFile file, final int maxEntrySize) {
this.file = requireNonNull(file);
- this.channel = requireNonNull(channel);
this.maxEntrySize = maxEntrySize;
}
import io.atomix.storage.journal.index.Position;
import io.atomix.storage.journal.index.SparseJournalIndex;
import java.io.IOException;
-import java.nio.channels.FileChannel;
import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
private final JournalIndex journalIndex;
private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
private final AtomicInteger references = new AtomicInteger();
- private final FileChannel channel;
private JournalSegmentWriter writer;
private boolean open = true;
this.storageLevel = requireNonNull(storageLevel);
this.maxEntrySize = maxEntrySize;
journalIndex = new SparseJournalIndex(indexDensity);
- try {
- channel = FileChannel.open(file.path(),
- StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
- } catch (IOException e) {
- throw new StorageException(e);
- }
final var fileWriter = switch (storageLevel) {
- case DISK -> new DiskFileWriter(file, channel, maxEntrySize);
- case MAPPED -> new MappedFileWriter(file, channel, maxEntrySize);
+ case DISK -> new DiskFileWriter(file, maxEntrySize);
+ case MAPPED -> new MappedFileWriter(file, maxEntrySize);
};
writer = new JournalSegmentWriter(fileWriter, this, maxEntrySize, journalIndex)
// relinquish mapped memory
return writer.getLastIndex();
}
- /**
- * Returns the size of the segment.
- *
- * @return the size of the segment
- */
- int size() {
- try {
- return (int) channel.size();
- } catch (IOException e) {
- throw new StorageException(e);
- }
- }
-
/**
* Returns the segment file.
*
acquire();
final var buffer = writer.buffer();
- final var fileReader = buffer != null ? new MappedFileReader(file, buffer)
- : new DiskFileReader(file, channel, maxEntrySize);
+ final var fileReader = buffer != null ? new MappedFileReader(file, buffer) : new DiskFileReader(file, maxEntrySize);
final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
reader.setPosition(JournalSegmentDescriptor.BYTES);
readers.add(reader);
private void finishClose() {
writer.close();
try {
- channel.close();
+ file.close();
} catch (IOException e) {
throw new StorageException(e);
}
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
+import java.nio.channels.ReadableByteChannel;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
static final int VERSION = 1;
/**
- * Read a JournalSegmentDescriptor from a {@link Path}.
+ * Read a JournalSegmentDescriptor from a {@link ReadableByteChannel}.
*
- * @param path path to read from
+ * @param channel channel to read from
* @return A {@link JournalSegmentDescriptor}
* @throws IOException if an I/O error occurs or there is not enough data
*/
- public static @NonNull JournalSegmentDescriptor readFrom(final Path path) throws IOException {
- final byte[] bytes;
- try (var is = Files.newInputStream(path, StandardOpenOption.READ)) {
- bytes = is.readNBytes(BYTES);
+ public static @NonNull JournalSegmentDescriptor readFrom(final ReadableByteChannel channel) throws IOException {
+ final var buffer = ByteBuffer.allocate(BYTES);
+ final var read = channel.read(buffer);
+ if (read != BYTES) {
+ throw new IOException("Need " + BYTES + " bytes, only " + read + " available");
}
- if (bytes.length != BYTES) {
- throw new IOException("Need " + BYTES + " bytes, only " + bytes.length + " available");
- }
-
- final var buffer = ByteBuffer.wrap(bytes);
+ buffer.flip();
return new JournalSegmentDescriptor(
buffer.getInt(),
buffer.getLong(),
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
import java.nio.file.Path;
import org.eclipse.jdt.annotation.NonNull;
private final @NonNull JournalSegmentDescriptor descriptor;
private final @NonNull Path path;
- private JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor) {
+ private final RandomAccessFile file;
+
+ private JournalSegmentFile(final Path path, final JournalSegmentDescriptor descriptor,
+ final RandomAccessFile file) {
this.path = requireNonNull(path);
this.descriptor = requireNonNull(descriptor);
+ this.file = requireNonNull(file);
}
static @NonNull JournalSegmentFile createNew(final String name, final File directory,
final JournalSegmentDescriptor descriptor) throws IOException {
final var file = createSegmentFile(name, directory, descriptor.id());
- try (var raf = new RandomAccessFile(file, "rw")) {
+ final var raf = new RandomAccessFile(file, "rw");
+ try {
raf.setLength(descriptor.maxSegmentSize());
raf.write(descriptor.toArray());
+ } catch (IOException e) {
+ raf.close();
+ throw e;
}
- return new JournalSegmentFile(file.toPath(), descriptor);
+ return new JournalSegmentFile(file.toPath(), descriptor, raf);
}
static @NonNull JournalSegmentFile openExisting(final Path path) throws IOException {
- // read the descriptor
- final var descriptor = JournalSegmentDescriptor.readFrom(path);
- return new JournalSegmentFile(path, descriptor);
+ final var raf = new RandomAccessFile(path.toFile(), "rw");
+ final JournalSegmentDescriptor descriptor;
+ try {
+ // read the descriptor
+ descriptor = JournalSegmentDescriptor.readFrom(raf.getChannel());
+ } catch (IOException e) {
+ raf.close();
+ throw e;
+ }
+ return new JournalSegmentFile(path, descriptor, raf);
}
/**
return descriptor.maxSegmentSize();
}
+ int size() throws IOException {
+ return (int) file.length();
+ }
+
+ FileChannel channel() {
+ return file.getChannel();
+ }
+
+ void close() throws IOException {
+ file.close();
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("path", path).add("descriptor", descriptor).toString();
private final MappedFileReader reader;
private final ByteBuffer buffer;
- MappedFileWriter(final JournalSegmentFile file, final FileChannel channel, final int maxEntrySize) {
- super(file, channel, maxEntrySize);
+ MappedFileWriter(final JournalSegmentFile file, final int maxEntrySize) {
+ super(file, maxEntrySize);
- mappedBuffer = mapBuffer(channel, file.maxSize());
+ mappedBuffer = mapBuffer(file.channel(), file.maxSize());
buffer = mappedBuffer.slice();
reader = new MappedFileReader(file, mappedBuffer);
}
@Override
DiskFileWriter toDisk() {
close();
- return new DiskFileWriter(file, channel, maxEntrySize);
+ return new DiskFileWriter(file, maxEntrySize);
}
@Override
*/
public long size() {
return segments.values().stream()
- .mapToLong(JournalSegment::size)
+ .mapToLong(segment -> {
+ try {
+ return segment.file().size();
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ })
.sum();
}