JournalIndex index,
JournalSerdes namespace) {
super(channel, segment, maxEntrySize, index, namespace);
- this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
- memory.limit(0);
+ memory = allocMemory(maxEntrySize);
reset(0);
}
+ FileChannelJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
+ super(previous);
+ memory = allocMemory(maxEntrySize);
+ lastEntry = previous.getLastEntry();
+ currentPosition = position;
+ }
+
+ private static ByteBuffer allocMemory(int maxEntrySize) {
+ final var buf = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
+ buf.limit(0);
+ return buf;
+ }
+
@Override
MappedByteBuffer buffer() {
return null;
@Override
MappedJournalSegmentWriter<E> toMapped() {
- return new MappedJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
+ return new MappedJournalSegmentWriter<>(this, (int) currentPosition);
}
@Override
this.firstIndex = segment.index();
}
+ JournalSegmentWriter(JournalSegmentWriter<E> previous) {
+ this.channel = previous.channel;
+ this.segment = previous.segment;
+ this.maxEntrySize = previous.maxEntrySize;
+ this.index = previous.index;
+ this.namespace = previous.namespace;
+ this.firstIndex = previous.firstIndex;
+ }
+
@Override
public final void commit(final long index) {
// FIXME: CONTROLLER-2098: eliminate the need for this method
JournalIndex index,
JournalSerdes namespace) {
super(channel, segment, maxEntrySize, index, namespace);
+ mappedBuffer = mapBuffer(channel, segment.descriptor().maxSegmentSize());
+ buffer = mappedBuffer.slice();
+ reset(0);
+ }
+
+ MappedJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
+ super(previous);
+ mappedBuffer = mapBuffer(channel, segment.descriptor().maxSegmentSize());
+ buffer = mappedBuffer.slice();
+ lastEntry = previous.getLastEntry();
+ buffer.position(position);
+ }
+
+ private static @NonNull MappedByteBuffer mapBuffer(FileChannel channel, int maxSegmentSize) {
try {
- mappedBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
+ return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize);
} catch (IOException e) {
throw new StorageException(e);
}
- this.buffer = mappedBuffer.slice();
- reset(0);
}
@Override
@Override
FileChannelJournalSegmentWriter<E> toFileChannel() {
+ final int position = buffer.position();
close();
- return new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
+ return new FileChannelJournalSegmentWriter<>(this, position);
}
@Override