Move entry tracking to SegmentedJournalReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegment.java
index 3d5ab7ade9883d07712be09c978cc2db40839168..d21d9051b6e6c97fc71467cf186df4f48ea0e353 100644 (file)
@@ -18,6 +18,7 @@ package io.atomix.storage.journal;
 
 import com.google.common.base.MoreObjects;
 import io.atomix.storage.journal.index.JournalIndex;
+import io.atomix.storage.journal.index.Position;
 import io.atomix.storage.journal.index.SparseJournalIndex;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
@@ -26,6 +27,7 @@ import java.nio.file.StandardOpenOption;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jdt.annotation.Nullable;
 
 /**
  * Log segment.
@@ -37,7 +39,7 @@ final class JournalSegment<E> implements AutoCloseable {
   private final JournalSegmentDescriptor descriptor;
   private final StorageLevel storageLevel;
   private final int maxEntrySize;
-  private final JournalIndex index;
+  private final JournalIndex journalIndex;
   private final JournalSerdes namespace;
   private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
   private final AtomicInteger references = new AtomicInteger();
@@ -58,7 +60,7 @@ final class JournalSegment<E> implements AutoCloseable {
     this.storageLevel = storageLevel;
     this.maxEntrySize = maxEntrySize;
     this.namespace = namespace;
-    index = new SparseJournalIndex(indexDensity);
+    journalIndex = new SparseJournalIndex(indexDensity);
     try {
       channel = FileChannel.open(file.file().toPath(),
         StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
@@ -66,8 +68,9 @@ final class JournalSegment<E> implements AutoCloseable {
       throw new StorageException(e);
     }
     writer = switch (storageLevel) {
-        case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace);
-        case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace).toFileChannel();
+        case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace);
+        case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace)
+            .toFileChannel();
     };
   }
 
@@ -120,6 +123,16 @@ final class JournalSegment<E> implements AutoCloseable {
     return descriptor;
   }
 
+  /**
+   * Looks up the position of the given index.
+   *
+   * @param index the index to lookup
+   * @return the position of the given index or a lesser index, or {@code null}
+   */
+  @Nullable Position lookup(long index) {
+    return journalIndex.lookup(index);
+  }
+
   /**
    * Acquires a reference to the log segment.
    */
@@ -173,8 +186,9 @@ final class JournalSegment<E> implements AutoCloseable {
 
     final var buffer = writer.buffer();
     final var reader = buffer == null
-      ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace)
-        : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace);
+      ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace)
+        : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace);
+    reader.setPosition(JournalSegmentDescriptor.BYTES);
     readers.add(reader);
     return reader;
   }