<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jdt</groupId>
+ <artifactId>org.eclipse.jdt.annotation</artifactId>
+ </dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.zip.CRC32;
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
-class FileChannelJournalSegmentWriter<E> implements JournalWriter<E> {
+final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]);
private final FileChannel channel;
- private final JournalSegment<E> segment;
- private final int maxEntrySize;
- private final JournalIndex index;
- private final JournalSerdes namespace;
private final ByteBuffer memory;
- private final long firstIndex;
private Indexed<E> lastEntry;
private long currentPosition;
int maxEntrySize,
JournalIndex index,
JournalSerdes namespace) {
+ super(segment, maxEntrySize, index, namespace);
this.channel = channel;
- this.segment = segment;
- this.maxEntrySize = maxEntrySize;
- this.index = index;
this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
memory.limit(0);
- this.namespace = namespace;
- this.firstIndex = segment.index();
reset(0);
}
+ @Override
+ MappedByteBuffer buffer() {
+ return null;
+ }
+
@Override
public void reset(long index) {
long nextIndex = firstIndex;
return (Indexed<T>) indexedEntry;
}
- @Override
- public void commit(long index) {
-
- }
-
@Override
public void truncate(long index) {
// If the index is greater than or equal to the last index, skip the truncate.
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import static java.util.Objects.requireNonNull;
+
+import io.atomix.storage.journal.index.JournalIndex;
+import java.nio.MappedByteBuffer;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+
+abstract sealed class JournalSegmentWriter<E> implements JournalWriter<E>
+ permits FileChannelJournalSegmentWriter, MappedJournalSegmentWriter {
+ final @NonNull JournalSegment<E> segment;
+ final int maxEntrySize;
+ final @NonNull JournalIndex index;
+ final @NonNull JournalSerdes namespace;
+ final long firstIndex;
+
+ JournalSegmentWriter(final JournalSegment<E> segment, final int maxEntrySize, final JournalIndex index,
+ final JournalSerdes namespace) {
+ this.segment = requireNonNull(segment);
+ this.maxEntrySize = maxEntrySize;
+ this.index = requireNonNull(index);
+ this.namespace = requireNonNull(namespace);
+ this.firstIndex = segment.index();
+ }
+
+ @Override
+ public final void commit(final long index) {
+ // FIXME: CONTROLLER-2098: eliminate the need for this method
+ }
+
+ /**
+ * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
+ * buffer.
+ *
+ * @return the mapped buffer underlying the segment writer, or {@code null}.
+ */
+ abstract @Nullable MappedByteBuffer buffer();
+}
/**
* Mappable log segment writer.
*/
-class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
+final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
private final FileChannel channel;
private final JournalSegment<E> segment;
private final int maxEntrySize;
private final JournalIndex index;
private final JournalSerdes namespace;
- private JournalWriter<E> writer;
+ private JournalSegmentWriter<E> writer;
MappableJournalSegmentWriter(
FileChannel channel,
}
try {
- JournalWriter<E> writer = this.writer;
+ JournalSegmentWriter<E> writer = this.writer;
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
this.writer = new MappedJournalSegmentWriter<>(buffer, segment, maxEntrySize, index, namespace);
writer.close();
*/
void unmap() {
if (writer instanceof MappedJournalSegmentWriter) {
- JournalWriter<E> writer = this.writer;
+ JournalSegmentWriter<E> writer = this.writer;
this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
writer.close();
}
}
MappedByteBuffer buffer() {
- JournalWriter<E> writer = this.writer;
- if (writer instanceof MappedJournalSegmentWriter) {
- return ((MappedJournalSegmentWriter<E>) writer).buffer();
- }
- return null;
+ return writer.buffer();
}
/**
*/
package io.atomix.storage.journal;
+import static java.util.Objects.requireNonNull;
+
import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.zip.CRC32;
+import org.eclipse.jdt.annotation.NonNull;
/**
* Segment writer.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
-class MappedJournalSegmentWriter<E> implements JournalWriter<E> {
- private final MappedByteBuffer mappedBuffer;
+final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
+ private final @NonNull MappedByteBuffer mappedBuffer;
private final ByteBuffer buffer;
- private final JournalSegment<E> segment;
- private final int maxEntrySize;
- private final JournalIndex index;
- private final JournalSerdes namespace;
- private final long firstIndex;
+
private Indexed<E> lastEntry;
MappedJournalSegmentWriter(
int maxEntrySize,
JournalIndex index,
JournalSerdes namespace) {
- this.mappedBuffer = buffer;
+ super(segment, maxEntrySize, index, namespace);
+ this.mappedBuffer = requireNonNull(buffer);
this.buffer = buffer.slice();
- this.segment = segment;
- this.maxEntrySize = maxEntrySize;
- this.index = index;
- this.namespace = namespace;
- this.firstIndex = segment.index();
reset(0);
}
- /**
- * Returns the mapped buffer underlying the segment writer.
- *
- * @return the mapped buffer underlying the segment writer
- */
- MappedByteBuffer buffer() {
+ @Override
+ @NonNull MappedByteBuffer buffer() {
return mappedBuffer;
}
return (Indexed<T>) indexedEntry;
}
- @Override
- public void commit(long index) {
-
- }
@Override
public void truncate(long index) {