*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
-final class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
+final class FileChannelJournalSegmentReader<E> extends JournalSegmentReader<E> {
private final FileChannel channel;
- private final int maxEntrySize;
- private final JournalIndex index;
- private final JournalSerdes namespace;
private final ByteBuffer memory;
- private final long firstIndex;
private Indexed<E> currentEntry;
private Indexed<E> nextEntry;
private long currentPosition;
int maxEntrySize,
JournalIndex index,
JournalSerdes namespace) {
+ super(segment, maxEntrySize, index, namespace);
this.channel = channel;
- this.maxEntrySize = maxEntrySize;
- this.index = index;
- this.namespace = namespace;
this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
- this.firstIndex = segment.index();
reset();
}
- @Override
- public long getFirstIndex() {
- return firstIndex;
- }
-
@Override
public long getCurrentIndex() {
return currentEntry != null ? currentEntry.index() : 0;
throw new StorageException(e);
}
}
-
- @Override
- public void close() {
- // Do nothing. The parent reader manages the channel.
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import io.atomix.storage.journal.index.JournalIndex;
+
+abstract sealed class JournalSegmentReader<E> implements JournalReader<E>
+ permits FileChannelJournalSegmentReader, MappedJournalSegmentReader {
+ final int maxEntrySize;
+ final JournalIndex index;
+ final JournalSerdes namespace;
+ final long firstIndex;
+
+ JournalSegmentReader(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
+ final JournalSerdes namespace) {
+ this.maxEntrySize = maxEntrySize;
+ this.index = requireNonNull(index);
+ this.namespace = requireNonNull(namespace);
+ this.firstIndex = segment.index();
+ }
+
+ @Override
+ public final long getFirstIndex() {
+ return firstIndex;
+ }
+
+ @Override
+ public final void close() {
+ // FIXME: CONTROLLER-2098: remove this method
+ }
+}
/**
* Mappable log segment reader.
*/
-class MappableJournalSegmentReader<E> implements JournalReader<E> {
+final class MappableJournalSegmentReader<E> implements JournalReader<E> {
private final JournalSegment<E> segment;
private final FileChannel channel;
private final int maxEntrySize;
private final JournalIndex index;
private final JournalSerdes namespace;
- private JournalReader<E> reader;
+ private JournalSegmentReader<E> reader;
MappableJournalSegmentReader(
FileChannel channel,
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
-final class MappedJournalSegmentReader<E> implements JournalReader<E> {
+final class MappedJournalSegmentReader<E> extends JournalSegmentReader<E> {
private final ByteBuffer buffer;
- private final int maxEntrySize;
- private final JournalIndex index;
- private final JournalSerdes namespace;
- private final long firstIndex;
private Indexed<E> currentEntry;
private Indexed<E> nextEntry;
int maxEntrySize,
JournalIndex index,
JournalSerdes namespace) {
+ super(segment, maxEntrySize, index, namespace);
this.buffer = buffer.slice();
- this.maxEntrySize = maxEntrySize;
- this.index = index;
- this.namespace = namespace;
- this.firstIndex = segment.index();
reset();
}
- @Override
- public long getFirstIndex() {
- return firstIndex;
- }
-
@Override
public long getCurrentIndex() {
return currentEntry != null ? currentEntry.index() : 0;
nextEntry = null;
}
}
-
- @Override
- public void close() {
- // Do nothing. The writer is responsible for cleaning the mapped buffer.
- }
}