X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=atomix-storage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FJournalSegment.java;h=d21d9051b6e6c97fc71467cf186df4f48ea0e353;hp=3d5ab7ade9883d07712be09c978cc2db40839168;hb=88a1e0bc7269500b63f519e20145ae6f751485f1;hpb=926877f74ff564663d1e892cbad4c13a25e7e373 diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index 3d5ab7ade9..d21d9051b6 100644 --- a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -18,6 +18,7 @@ package io.atomix.storage.journal; import com.google.common.base.MoreObjects; import io.atomix.storage.journal.index.JournalIndex; +import io.atomix.storage.journal.index.Position; import io.atomix.storage.journal.index.SparseJournalIndex; import java.io.IOException; import java.nio.channels.FileChannel; @@ -26,6 +27,7 @@ import java.nio.file.StandardOpenOption; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.jdt.annotation.Nullable; /** * Log segment. @@ -37,7 +39,7 @@ final class JournalSegment implements AutoCloseable { private final JournalSegmentDescriptor descriptor; private final StorageLevel storageLevel; private final int maxEntrySize; - private final JournalIndex index; + private final JournalIndex journalIndex; private final JournalSerdes namespace; private final Set> readers = ConcurrentHashMap.newKeySet(); private final AtomicInteger references = new AtomicInteger(); @@ -58,7 +60,7 @@ final class JournalSegment implements AutoCloseable { this.storageLevel = storageLevel; this.maxEntrySize = maxEntrySize; this.namespace = namespace; - index = new SparseJournalIndex(indexDensity); + journalIndex = new SparseJournalIndex(indexDensity); try { channel = FileChannel.open(file.file().toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); @@ -66,8 +68,9 @@ final class JournalSegment implements AutoCloseable { throw new StorageException(e); } writer = switch (storageLevel) { - case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace); - case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace).toFileChannel(); + case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace); + case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace) + .toFileChannel(); }; } @@ -120,6 +123,16 @@ final class JournalSegment implements AutoCloseable { return descriptor; } + /** + * Looks up the position of the given index. + * + * @param index the index to lookup + * @return the position of the given index or a lesser index, or {@code null} + */ + @Nullable Position lookup(long index) { + return journalIndex.lookup(index); + } + /** * Acquires a reference to the log segment. */ @@ -173,8 +186,9 @@ final class JournalSegment implements AutoCloseable { final var buffer = writer.buffer(); final var reader = buffer == null - ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace) - : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace); + ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace) + : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace); + reader.setPosition(JournalSegmentDescriptor.BYTES); readers.add(reader); return reader; }