*/
package io.atomix.storage.journal;
-import io.atomix.storage.journal.index.JournalIndex;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
FileChannel channel,
JournalSegment<E> segment,
int maxEntrySize,
- JournalIndex index,
JournalSerdes namespace) {
- super(segment, maxEntrySize, index, namespace);
+ super(segment, maxEntrySize, namespace);
this.channel = channel;
this.memory = ByteBuffer.allocate((maxEntrySize + SegmentEntry.HEADER_BYTES) * 2);
- reset();
}
@Override
*/
package io.atomix.storage.journal;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNullByDefault;
/**
* Indexed journal entry.
* @param entry the indexed entry
* @param size the serialized entry size
*/
-// FIXME: add @NonNullByDefault and enforce non-null entry once we can say that entries cannot be null
// FIXME: it seems 'index' has to be non-zero, we should enforce that if that really is the case
// FIXME: it seems 'size' has not be non-zero, we should enforce that if that really is the case
+@NonNullByDefault
public record Indexed<E>(long index, E entry, int size) {
+ public Indexed {
+ requireNonNull(entry);
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("index", index).add("entry", entry).toString();
import com.google.common.base.MoreObjects;
import io.atomix.storage.journal.index.JournalIndex;
+import io.atomix.storage.journal.index.Position;
import io.atomix.storage.journal.index.SparseJournalIndex;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jdt.annotation.Nullable;
/**
* Log segment.
private final JournalSegmentDescriptor descriptor;
private final StorageLevel storageLevel;
private final int maxEntrySize;
- private final JournalIndex index;
+ private final JournalIndex journalIndex;
private final JournalSerdes namespace;
private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
private final AtomicInteger references = new AtomicInteger();
this.storageLevel = storageLevel;
this.maxEntrySize = maxEntrySize;
this.namespace = namespace;
- index = new SparseJournalIndex(indexDensity);
+ journalIndex = new SparseJournalIndex(indexDensity);
try {
channel = FileChannel.open(file.file().toPath(),
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
throw new StorageException(e);
}
writer = switch (storageLevel) {
- case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace);
- case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace).toFileChannel();
+ case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace);
+ case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace)
+ .toFileChannel();
};
}
return descriptor;
}
+ /**
+ * Looks up the position of the given index.
+ *
+ * @param index the index to lookup
+ * @return the position of the given index or a lesser index, or {@code null}
+ */
+ @Nullable Position lookup(long index) {
+ return journalIndex.lookup(index);
+ }
+
/**
* Acquires a reference to the log segment.
*/
final var buffer = writer.buffer();
final var reader = buffer == null
- ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace)
- : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace);
+ ? new DiskJournalSegmentReader<>(channel, this, maxEntrySize, namespace)
+ : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, namespace);
+ reader.setPosition(JournalSegmentDescriptor.BYTES);
readers.add(reader);
return reader;
}
import static java.util.Objects.requireNonNull;
-import io.atomix.storage.journal.index.JournalIndex;
-import io.atomix.storage.journal.index.Position;
import org.eclipse.jdt.annotation.Nullable;
abstract sealed class JournalSegmentReader<E> permits DiskJournalSegmentReader, MappedJournalSegmentReader {
final int maxEntrySize;
- private final JournalIndex index;
final JournalSerdes namespace;
- private final long firstIndex;
private final JournalSegment<E> segment;
- private Indexed<E> currentEntry;
- private Indexed<E> nextEntry;
-
- JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
- final JournalSerdes namespace) {
+ JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalSerdes namespace) {
this.segment = requireNonNull(segment);
this.maxEntrySize = maxEntrySize;
- this.index = requireNonNull(index);
this.namespace = requireNonNull(namespace);
- firstIndex = segment.index();
- }
-
- /**
- * Returns the last read entry.
- *
- * @return The last read entry.
- */
- final Indexed<E> getCurrentEntry() {
- return currentEntry;
- }
-
- /**
- * Returns the next reader index.
- *
- * @return The next reader index.
- */
- final long getNextIndex() {
- return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
- }
-
- /**
- * Returns the next entry in the reader.
- *
- * @return The next entry in the reader, or {@code null}
- */
- final @Nullable Indexed<E> tryNext() {
- if (nextEntry == null) {
- nextEntry = readNext();
- }
- if (nextEntry == null) {
- return null;
- }
-
- // Set the current entry to the next entry.
- currentEntry = nextEntry;
-
- // Reset the next entry to null.
- nextEntry = null;
-
- // Read the next entry in the segment.
- nextEntry = readNext();
-
- // Return the current entry.
- return currentEntry;
- }
-
- /**
- * Resets the reader to the start of the segment.
- */
- final void reset() {
- currentEntry = null;
- nextEntry = null;
- setPosition(JournalSegmentDescriptor.BYTES);
- nextEntry = readNext();
- }
-
- /**
- * Resets the reader to the given index.
- *
- * @param index The index to which to reset the reader.
- */
- final void reset(final long index) {
- reset();
- Position position = this.index.lookup(index - 1);
- if (position != null) {
- // FIXME: why do we need a 'null'-based entry here?
- currentEntry = new Indexed<>(position.index() - 1, null, 0);
- setPosition(position.position());
- nextEntry = readNext();
- }
- while (getNextIndex() < index && tryNext() != null) {
- // Nothing else
- }
}
/**
* @return The entry, or {@code null}
*/
abstract @Nullable Indexed<E> readEntry(long index);
-
- private @Nullable Indexed<E> readNext() {
- // Compute the index of the next entry in the segment.
- return readEntry(getNextIndex());
- }
}
*/
package io.atomix.storage.journal;
-import io.atomix.storage.journal.index.JournalIndex;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.zip.CRC32;
ByteBuffer buffer,
JournalSegment<E> segment,
int maxEntrySize,
- JournalIndex index,
JournalSerdes namespace) {
- super(segment, maxEntrySize, index, namespace);
+ super(segment, maxEntrySize, namespace);
this.buffer = buffer.slice();
- reset();
}
@Override
* A {@link JournalReader} traversing all entries.
*/
sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
- final SegmentedJournal<E> journal;
- private JournalSegment<E> currentSegment;
- private Indexed<E> previousEntry;
- private JournalSegmentReader<E> currentReader;
-
- SegmentedJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
- this.journal = requireNonNull(journal);
- currentSegment = requireNonNull(segment);
- currentReader = segment.createReader();
- }
-
- @Override
- public final long getFirstIndex() {
- return journal.getFirstSegment().index();
- }
-
- @Override
- public final long getCurrentIndex() {
- final var currentEntry = currentReader.getCurrentEntry();
- if (currentEntry != null) {
- final long currentIndex = currentEntry.index();
- if (currentIndex != 0) {
- return currentIndex;
- }
+ final SegmentedJournal<E> journal;
+
+ private JournalSegment<E> currentSegment;
+ private JournalSegmentReader<E> currentReader;
+ private Indexed<E> currentEntry;
+ private long nextIndex;
+
+ SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment<E> segment) {
+ this.journal = requireNonNull(journal);
+ currentSegment = requireNonNull(segment);
+ currentReader = segment.createReader();
+ nextIndex = currentSegment.index();
+ currentEntry = null;
}
- return previousEntry != null ? previousEntry.index() : 0;
- }
-
- @Override
- public final Indexed<E> getCurrentEntry() {
- // If previousEntry was the last in the previous segment, we may have moved currentReader to the next segment.
- // That segment may be empty, though, in which case we need to report the previousEntry.
- final Indexed<E> currentEntry;
- return (currentEntry = currentReader.getCurrentEntry()) != null ? currentEntry : previousEntry;
- }
-
- @Override
- public final long getNextIndex() {
- return currentReader.getNextIndex();
- }
-
- @Override
- public final void reset() {
- previousEntry = null;
- currentReader.close();
-
- currentSegment = journal.getFirstSegment();
- currentReader = currentSegment.createReader();
- }
-
- @Override
- public final void reset(long index) {
- // If the current segment is not open, it has been replaced. Reset the segments.
- if (!currentSegment.isOpen()) {
- reset();
+
+ @Override
+ public final long getFirstIndex() {
+ return journal.getFirstSegment().index();
}
- final var nextIndex = currentReader.getNextIndex();
- if (index < nextIndex) {
- rewind(index);
- } else if (index > nextIndex) {
- forward(index);
- } else {
- currentReader.reset(index);
+ @Override
+ public final long getCurrentIndex() {
+ return currentEntry != null ? currentEntry.index() : 0;
}
- }
-
- /**
- * Rewinds the journal to the given index.
- */
- private void rewind(long index) {
- if (currentSegment.index() >= index) {
- JournalSegment<E> segment = journal.getSegment(index - 1);
- if (segment != null) {
- currentReader.close();
- currentSegment = segment;
- currentReader = currentSegment.createReader();
- }
+ @Override
+ public final Indexed<E> getCurrentEntry() {
+ return currentEntry;
}
- currentReader.reset(index);
- previousEntry = currentReader.getCurrentEntry();
- }
+ @Override
+ public final long getNextIndex() {
+ return nextIndex;
+ }
+
+ @Override
+ public final void reset() {
+ currentReader.close();
- /**
- * Fast forwards the journal to the given index.
- */
- private void forward(long index) {
- while (getNextIndex() < index && tryNext() != null) {
- // Nothing else
+ currentSegment = journal.getFirstSegment();
+ currentReader = currentSegment.createReader();
+ nextIndex = currentSegment.index();
+ currentEntry = null;
}
- }
-
- @Override
- public Indexed<E> tryNext() {
- final var current = currentReader.getCurrentEntry();
- final var next = currentReader.tryNext();
- if (next != null) {
- previousEntry = current;
- return next;
+
+ @Override
+ public final void reset(final long index) {
+ // If the current segment is not open, it has been replaced. Reset the segments.
+ if (!currentSegment.isOpen()) {
+ reset();
+ }
+
+ if (index < nextIndex) {
+ rewind(index);
+ } else if (index > nextIndex) {
+ while (index > nextIndex && tryNext() != null) {
+ // Nothing else
+ }
+ } else {
+ resetCurrentReader(index);
+ }
}
- final var nextSegment = journal.getNextSegment(currentSegment.index());
- if (nextSegment == null || nextSegment.index() != getNextIndex()) {
- return null;
+ private void resetCurrentReader(final long index) {
+ final var position = currentSegment.lookup(index - 1);
+ if (position != null) {
+ nextIndex = position.index();
+ currentReader.setPosition(position.position());
+ } else {
+ nextIndex = currentSegment.index();
+ currentReader.setPosition(JournalSegmentDescriptor.BYTES);
+ }
+ while (nextIndex < index && tryNext() != null) {
+ // Nothing else
+ }
}
- previousEntry = currentReader.getCurrentEntry();
- currentReader.close();
+ /**
+ * Rewinds the journal to the given index.
+ */
+ private void rewind(final long index) {
+ if (currentSegment.index() >= index) {
+ JournalSegment<E> segment = journal.getSegment(index - 1);
+ if (segment != null) {
+ currentReader.close();
+
+ currentSegment = segment;
+ currentReader = currentSegment.createReader();
+ }
+ }
+
+ resetCurrentReader(index);
+ }
- currentSegment = nextSegment;
- currentReader = currentSegment.createReader();
- return currentReader.tryNext();
- }
+ @Override
+ public Indexed<E> tryNext() {
+ var next = currentReader.readEntry(nextIndex);
+ if (next == null) {
+ final var nextSegment = journal.getNextSegment(currentSegment.index());
+ if (nextSegment == null || nextSegment.index() != nextIndex) {
+ return null;
+ }
+
+ currentReader.close();
+
+ currentSegment = nextSegment;
+ currentReader = currentSegment.createReader();
+ next = currentReader.readEntry(nextIndex);
+ if (next == null) {
+ return null;
+ }
+ }
+
+ nextIndex = nextIndex + 1;
+ currentEntry = next;
+ return next;
+ }
- @Override
- public final void close() {
- currentReader.close();
- journal.closeReader(this);
- }
+ @Override
+ public final void close() {
+ currentReader.close();
+ journal.closeReader(this);
+ }
}