import static java.util.Objects.requireNonNull;
import io.atomix.storage.journal.index.JournalIndex;
+import io.atomix.storage.journal.index.Position;
+import java.util.NoSuchElementException;
+import org.eclipse.jdt.annotation.Nullable;
abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
final int maxEntrySize;
- final JournalIndex index;
+ private final JournalIndex index;
final JournalSerdes namespace;
- final long firstIndex;
+ private final long firstIndex;
+ private final JournalSegment<E> segment;
+
+ private Indexed<E> currentEntry;
+ private Indexed<E> nextEntry;
JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
final JournalSerdes namespace) {
+ this.segment = requireNonNull(segment);
this.maxEntrySize = maxEntrySize;
this.index = requireNonNull(index);
this.namespace = requireNonNull(namespace);
- this.firstIndex = segment.index();
+ firstIndex = segment.index();
}
@Override
return firstIndex;
}
+ @Override
+ public final long getCurrentIndex() {
+ return currentEntry != null ? currentEntry.index() : 0;
+ }
+
+ @Override
+ public final Indexed<E> getCurrentEntry() {
+ return currentEntry;
+ }
+
+ @Override
+ public final long getNextIndex() {
+ return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
+ }
+
+ @Override
+ public final boolean hasNext() {
+ return nextEntry != null || (nextEntry = readNext()) != null;
+ }
+
+ @Override
+ public final Indexed<E> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ // Set the current entry to the next entry.
+ currentEntry = nextEntry;
+
+ // Reset the next entry to null.
+ nextEntry = null;
+
+ // Read the next entry in the segment.
+ nextEntry = readNext();
+
+ // Return the current entry.
+ return currentEntry;
+ }
+
+ @Override
+ public final void reset() {
+ currentEntry = null;
+ nextEntry = null;
+ setPosition(JournalSegmentDescriptor.BYTES);
+ nextEntry = readNext();
+ }
+
+ @Override
+ public final void reset(final long index) {
+ reset();
+ Position position = this.index.lookup(index - 1);
+ if (position != null) {
+ currentEntry = new Indexed<>(position.index() - 1, null, 0);
+ setPosition(position.position());
+ nextEntry = readNext();
+ }
+ while (getNextIndex() < index && hasNext()) {
+ next();
+ }
+ }
+
@Override
public final void close() {
- // FIXME: CONTROLLER-2098: remove this method
+ segment.closeReader(this);
}
+
+ /**
+ * Set the file position.
+ *
+ * @param position new position
+ */
+ abstract void setPosition(int position);
+
+ /**
+ * Reads the next entry in the segment.
+ *
+ * @return Next entry, or {@code null}
+ */
+ abstract @Nullable Indexed<E> readNext();
}