Simplify FileChannelJournalSegmentWriter position tracking 72/110572/3
authorRobert Varga <robert.varga@pantheon.tech>
Sat, 9 Mar 2024 17:13:01 +0000 (18:13 +0100)
committerRobert Varga <nite@hq.sk>
Mon, 11 Mar 2024 12:51:11 +0000 (12:51 +0000)
This patch makes the obvious switch in state tracking: memory buffer
does not hold anything of value w.r.t. overall state, so there is no
point in marking/resetting and maintaining memory.position() beyond
what we need for reading from it.

Local 'position' acts now as the authoritative source of where we want
to go and we propagate it to channel position when we validate an entry.

JIRA: CONTROLLER-2095
Change-Id: Iab150a3c7348c714170223b397167cd4bb87f087
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
atomix-storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java

index 39d23636769a351a2fb866f4e38668732dd60c04..ed9ff922f7da6aa6ad816239ba4fa8a9525014c2 100644 (file)
@@ -86,7 +86,6 @@ class FileChannelJournalSegmentWriter<E> implements JournalWriter<E> {
       memory.flip();
 
       // Read the entry length.
-      memory.mark();
       int length = memory.getInt();
 
       // If the length is non-zero, read the entry.
@@ -108,37 +107,28 @@ class FileChannelJournalSegmentWriter<E> implements JournalWriter<E> {
           break;
         }
 
-        int limit = memory.limit();
-        memory.limit(memory.position() + length);
-        final E entry = namespace.deserialize(memory);
-        memory.limit(limit);
+        entryBytes.rewind();
+        final E entry = namespace.deserialize(entryBytes);
         lastEntry = new Indexed<>(nextIndex, entry, length);
         this.index.index(nextIndex, (int) position);
         nextIndex++;
 
         // Update the current position for indexing.
-        position = channel.position() + memory.position();
+        position = position + Integer.BYTES + Integer.BYTES + length;
+        channel.position(position);
+        memory.position(memory.position() + length);
 
         // Read more bytes from the segment if necessary.
         if (memory.remaining() < maxEntrySize) {
           memory.clear();
-          channel.position(position);
           channel.read(memory, position);
           memory.flip();
         }
 
-        memory.mark();
         length = memory.getInt();
       }
-
-      // Reset the buffer to the previous mark.
-      channel.position(channel.position() + memory.reset().position());
     } catch (BufferUnderflowException e) {
-      try {
-        channel.position(channel.position() + memory.reset().position());
-      } catch (IOException e2) {
-        throw new StorageException(e2);
-      }
+      // No-op, position is only updated on success
     } catch (IOException e) {
       throw new StorageException(e);
     }