Perform proper handoff between JournalSegmentWriters 76/110576/3
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 10 Mar 2024 13:27:28 +0000 (14:27 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 11 Mar 2024 12:51:31 +0000 (13:51 +0100)
Switching between JournalSegmentWriter implementations is a matter of
just handing off state, which is limited to the last entry and position
in the file.

This patch introduces alternative constructors, which allow this state
to be moved to the new implementation without needing to re-read the
file.

JIRA: CONTROLLER-2043
Change-Id: I796cb4da4e09dfe6332ca82a7dfdb76740e30164
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java

index 75bd9ac7e47483b062be62ebb81f678971fa9f1f..037c20d3293e1c2a14bb2f530de9b4c6c4ff384d 100644 (file)
@@ -55,11 +55,23 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
       JournalIndex index,
       JournalSerdes namespace) {
     super(channel, segment, maxEntrySize, index, namespace);
-    this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
-    memory.limit(0);
+    memory = allocMemory(maxEntrySize);
     reset(0);
   }
 
+  FileChannelJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
+    super(previous);
+    memory = allocMemory(maxEntrySize);
+    lastEntry = previous.getLastEntry();
+    currentPosition = position;
+  }
+
+  private static ByteBuffer allocMemory(int maxEntrySize) {
+    final var buf = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
+    buf.limit(0);
+    return buf;
+  }
+
   @Override
   MappedByteBuffer buffer() {
     return null;
@@ -67,7 +79,7 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
   @Override
   MappedJournalSegmentWriter<E> toMapped() {
-    return new MappedJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
+    return new MappedJournalSegmentWriter<>(this, (int) currentPosition);
   }
 
   @Override
index b3c4fd04988f7ec9c9eaa68c794ad96c88f40b04..54661a2b47ec3bc1195a85e00a05a99e618c5d51 100644 (file)
@@ -34,6 +34,15 @@ abstract sealed class JournalSegmentWriter<E> implements JournalWriter<E>
         this.firstIndex = segment.index();
     }
 
+    JournalSegmentWriter(JournalSegmentWriter<E> previous) {
+        this.channel = previous.channel;
+        this.segment = previous.segment;
+        this.maxEntrySize = previous.maxEntrySize;
+        this.index = previous.index;
+        this.namespace = previous.namespace;
+        this.firstIndex = previous.firstIndex;
+    }
+
     @Override
     public final void commit(final long index) {
         // FIXME: CONTROLLER-2098: eliminate the need for this method
index f5d4ed9d04a9554de4ec935c6b54b85720158973..2a479ed192e3d5e0b95403b3f7ede07ce4602a59 100644 (file)
@@ -55,13 +55,25 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
       JournalIndex index,
       JournalSerdes namespace) {
     super(channel, segment, maxEntrySize, index, namespace);
+    mappedBuffer = mapBuffer(channel, segment.descriptor().maxSegmentSize());
+    buffer = mappedBuffer.slice();
+    reset(0);
+  }
+
+  MappedJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
+    super(previous);
+    mappedBuffer = mapBuffer(channel, segment.descriptor().maxSegmentSize());
+    buffer = mappedBuffer.slice();
+    lastEntry = previous.getLastEntry();
+    buffer.position(position);
+  }
+
+  private static @NonNull MappedByteBuffer mapBuffer(FileChannel channel, int maxSegmentSize) {
     try {
-      mappedBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
+      return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize);
     } catch (IOException e) {
       throw new StorageException(e);
     }
-    this.buffer = mappedBuffer.slice();
-    reset(0);
   }
 
   @Override
@@ -76,8 +88,9 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
   @Override
   FileChannelJournalSegmentWriter<E> toFileChannel() {
+    final int position = buffer.position();
     close();
-    return new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
+    return new FileChannelJournalSegmentWriter<>(this, position);
   }
 
   @Override