Maintain last known position in JournalIndex
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
index dbf6aec214ffe1ef32f6263477272e12e4adc76f..b18371f04424c015ce2a031dafccf7838bbd36c7 100644 (file)
@@ -19,6 +19,7 @@ import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 import static java.util.Objects.requireNonNull;
 
 import io.atomix.storage.journal.index.JournalIndex;
+import io.atomix.storage.journal.index.Position;
 import io.netty.buffer.ByteBuf;
 import java.nio.MappedByteBuffer;
 import org.eclipse.jdt.annotation.NonNull;
@@ -36,7 +37,6 @@ final class JournalSegmentWriter {
     final int maxEntrySize;
 
     private int currentPosition;
-    private Long lastIndex;
 
     JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
             final JournalIndex index) {
@@ -54,7 +54,6 @@ final class JournalSegmentWriter {
         index = previous.index;
         maxSegmentSize = previous.maxSegmentSize;
         maxEntrySize = previous.maxEntrySize;
-        lastIndex = previous.lastIndex;
         currentPosition = previous.currentPosition;
         this.fileWriter = requireNonNull(fileWriter);
     }
@@ -65,7 +64,8 @@ final class JournalSegmentWriter {
      * @return The last written index.
      */
     long getLastIndex() {
-        return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
+        final var last = index.last();
+        return last != null ? last.index() : segment.firstIndex() - 1;
     }
 
     /**
@@ -74,7 +74,8 @@ final class JournalSegmentWriter {
      * @return The next index to be written.
      */
     long getNextIndex() {
-        return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
+        final var last = index.last();
+        return last != null ? last.index() + 1 : segment.firstIndex();
     }
 
     /**
@@ -83,7 +84,7 @@ final class JournalSegmentWriter {
      * @param buf binary data to append
      * @return The index of appended data, or {@code null} if segment has no space
      */
-    Long append(final ByteBuf buf) {
+    Position append(final ByteBuf buf) {
         final var length = buf.readableBytes();
         if (length > maxEntrySize) {
             throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
@@ -113,10 +114,7 @@ final class JournalSegmentWriter {
 
         // Update the last entry with the correct index/term/length.
         currentPosition = nextPosition;
-        lastIndex = index;
-        this.index.index(index, position);
-
-        return index;
+        return this.index.index(index, position);
     }
 
     /**
@@ -149,9 +147,7 @@ final class JournalSegmentWriter {
                 break;
             }
 
-            lastIndex = nextIndex;
-            this.index.index(nextIndex, currentPosition);
-            nextIndex++;
+            this.index.index(nextIndex++, currentPosition);
 
             // Update the current position for indexing.
             currentPosition += HEADER_BYTES + buf.readableBytes();
@@ -169,9 +165,6 @@ final class JournalSegmentWriter {
             return;
         }
 
-        // Reset the last written
-        lastIndex = null;
-
         // Truncate the index.
         this.index.truncate(index);