final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]);
- private final FileChannel channel;
private final ByteBuffer memory;
private Indexed<E> lastEntry;
private long currentPosition;
int maxEntrySize,
JournalIndex index,
JournalSerdes namespace) {
- super(segment, maxEntrySize, index, namespace);
- this.channel = channel;
+ super(channel, segment, maxEntrySize, index, namespace);
this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
memory.limit(0);
reset(0);
return null;
}
+ @Override
+ MappedJournalSegmentWriter<E> toMapped() {
+ return new MappedJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
+ }
+
+ @Override
+ FileChannelJournalSegmentWriter<E> toFileChannel() {
+ return this;
+ }
+
@Override
public void reset(long index) {
long nextIndex = firstIndex;
import io.atomix.storage.journal.index.JournalIndex;
import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
abstract sealed class JournalSegmentWriter<E> implements JournalWriter<E>
permits FileChannelJournalSegmentWriter, MappedJournalSegmentWriter {
+ final @NonNull FileChannel channel;
final @NonNull JournalSegment<E> segment;
final int maxEntrySize;
final @NonNull JournalIndex index;
final @NonNull JournalSerdes namespace;
final long firstIndex;
- JournalSegmentWriter(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
- final JournalSerdes namespace) {
+ JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+ final JournalIndex index, final JournalSerdes namespace) {
+ this.channel = requireNonNull(channel);
this.segment = requireNonNull(segment);
this.maxEntrySize = maxEntrySize;
this.index = requireNonNull(index);
* @return the mapped buffer underlying the segment writer, or {@code null}.
*/
abstract @Nullable MappedByteBuffer buffer();
+
+ abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
+
+ abstract @NonNull FileChannelJournalSegmentWriter<E> toFileChannel();
}
final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
private final FileChannel channel;
private final JournalSegment<E> segment;
- private final int maxEntrySize;
- private final JournalIndex index;
- private final JournalSerdes namespace;
private JournalSegmentWriter<E> writer;
MappableJournalSegmentWriter(
JournalSerdes namespace) {
this.channel = channel;
this.segment = segment;
- this.maxEntrySize = maxEntrySize;
- this.index = index;
- this.namespace = namespace;
this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
}
* @return the buffer that was mapped into memory
*/
MappedByteBuffer map() {
- if (writer instanceof MappedJournalSegmentWriter) {
- return ((MappedJournalSegmentWriter<E>) writer).buffer();
- }
-
- try {
- JournalSegmentWriter<E> writer = this.writer;
- MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
- this.writer = new MappedJournalSegmentWriter<>(buffer, segment, maxEntrySize, index, namespace);
- writer.close();
- return buffer;
- } catch (IOException e) {
- throw new StorageException(e);
- }
+ final var mapped = writer.toMapped();
+ writer = mapped;
+ return mapped.buffer();
}
/**
* Unmaps the mapped buffer.
*/
void unmap() {
- if (writer instanceof MappedJournalSegmentWriter) {
- JournalSegmentWriter<E> writer = this.writer;
- this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
- writer.close();
- }
+ writer = writer.toFileChannel();
}
MappedByteBuffer buffer() {
*/
package io.atomix.storage.journal;
-import static java.util.Objects.requireNonNull;
-
import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.NonNull;
private Indexed<E> lastEntry;
MappedJournalSegmentWriter(
- MappedByteBuffer buffer,
+ FileChannel channel,
JournalSegment<E> segment,
int maxEntrySize,
JournalIndex index,
JournalSerdes namespace) {
- super(segment, maxEntrySize, index, namespace);
- this.mappedBuffer = requireNonNull(buffer);
- this.buffer = buffer.slice();
+ super(channel, segment, maxEntrySize, index, namespace);
+ try {
+ mappedBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ this.buffer = mappedBuffer.slice();
reset(0);
}
return mappedBuffer;
}
+ @Override
+ MappedJournalSegmentWriter<E> toMapped() {
+ return this;
+ }
+
+ @Override
+ FileChannelJournalSegmentWriter<E> toFileChannel() {
+ close();
+ return new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
+ }
+
@Override
public void reset(long index) {
long nextIndex = firstIndex;