import com.google.common.base.MoreObjects;
import io.atomix.storage.journal.index.JournalIndex;
+import io.atomix.storage.journal.index.Position;
import io.atomix.storage.journal.index.SparseJournalIndex;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jdt.annotation.Nullable;
/**
* Log segment.
private final JournalSegmentDescriptor descriptor;
private final StorageLevel storageLevel;
private final int maxEntrySize;
- private final JournalIndex index;
+ private final JournalIndex journalIndex;
private final JournalSerdes namespace;
private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
private final AtomicInteger references = new AtomicInteger();
this.storageLevel = storageLevel;
this.maxEntrySize = maxEntrySize;
this.namespace = namespace;
- index = new SparseJournalIndex(indexDensity);
+ journalIndex = new SparseJournalIndex(indexDensity);
try {
channel = FileChannel.open(file.file().toPath(),
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
throw new StorageException(e);
}
writer = switch (storageLevel) {
- case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace);
- case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace).toFileChannel();
+ case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace);
+ case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace)
+ .toFileChannel();
};
}
return descriptor;
}
+ /**
+ * Looks up the position of the given index.
+ *
+ * @param index the index to lookup
+ * @return the position of the given index or a lesser index, or {@code null}
+ */
+ @Nullable Position lookup(long index) {
+ return journalIndex.lookup(index);
+ }
+
/**
* Acquires a reference to the log segment.
*/
final var buffer = writer.buffer();
final var reader = buffer == null
- ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace)
- : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace);
+ ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace)
+ : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace);
+ reader.setPosition(JournalSegmentDescriptor.BYTES);
readers.add(reader);
return reader;
}