import io.atomix.storage.journal.index.JournalIndex;
import io.netty.buffer.ByteBuf;
-import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
+final class JournalSegmentWriter {
private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
- final @NonNull FileChannel channel;
+ private final FileWriter fileWriter;
final @NonNull JournalSegment segment;
private final @NonNull JournalIndex index;
final int maxSegmentSize;
private Long lastIndex;
private ByteBuf lastWritten;
- JournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+ JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
final JournalIndex index) {
- this.channel = requireNonNull(channel);
+ this.fileWriter = requireNonNull(fileWriter);
this.segment = requireNonNull(segment);
this.index = requireNonNull(index);
maxSegmentSize = segment.descriptor().maxSegmentSize();
this.maxEntrySize = maxEntrySize;
+ // adjust lastEntry value
+ reset(0);
}
- JournalSegmentWriter(final JournalSegmentWriter previous) {
- channel = previous.channel;
+ JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
segment = previous.segment;
index = previous.index;
maxSegmentSize = previous.maxSegmentSize;
lastWritten = previous.lastWritten;
lastIndex = previous.lastIndex;
currentPosition = previous.currentPosition;
+ this.fileWriter = requireNonNull(fileWriter);
}
/**
}
// allocate buffer and write data
- final var writeBuffer = startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
+ final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
writeBuffer.put(buf.nioBuffer());
// Compute the checksum for the entry.
// Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
- commitWrite(position, writeBuffer.rewind());
+ fileWriter.commitWrite(position, writeBuffer.rewind());
// Update the last entry with the correct index/term/length.
currentPosition = nextPosition;
return index;
}
- abstract ByteBuffer startWrite(int position, int size);
-
- abstract void commitWrite(int position, ByteBuffer entry);
-
/**
* Resets the head of the segment to the given index.
*
*/
final void reset(final long index) {
// acquire ownership of cache and make sure reader does not see anything we've done once we're done
- final var reader = reader();
- reader.invalidateCache();
+ final var fileReader = fileWriter.reader();
try {
- resetWithBuffer(reader, index);
+ resetWithBuffer(fileReader, index);
} finally {
// Make sure reader does not see anything we've done
- reader.invalidateCache();
+ fileReader.invalidateCache();
}
}
- abstract JournalSegmentReader reader();
-
- private void resetWithBuffer(final JournalSegmentReader reader, final long index) {
+ private void resetWithBuffer(final FileReader fileReader, final long index) {
long nextIndex = segment.firstIndex();
// Clear the buffer indexes and acquire ownership of the buffer
currentPosition = JournalSegmentDescriptor.BYTES;
+ final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
reader.setPosition(JournalSegmentDescriptor.BYTES);
while (index == 0 || nextIndex <= index) {
}
// Zero the entry header at current channel position.
- writeEmptyHeader(currentPosition);
+ fileWriter.writeEmptyHeader(currentPosition);
}
- /**
- * Write {@link SegmentEntry#HEADER_BYTES} worth of zeroes at specified position.
- *
- * @param position position to write to
- */
- abstract void writeEmptyHeader(int position);
-
/**
* Flushes written entries to disk.
*/
- abstract void flush();
+ void flush() {
+ fileWriter.flush();
+ }
/**
* Closes this writer.
*/
- abstract void close();
+ void close() {
+ fileWriter.close();
+ }
/**
* Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
*
* @return the mapped buffer underlying the segment writer, or {@code null}.
*/
- abstract @Nullable MappedByteBuffer buffer();
+ @Nullable MappedByteBuffer buffer() {
+ return fileWriter.buffer();
+ }
- abstract @NonNull MappedJournalSegmentWriter toMapped();
+ @NonNull JournalSegmentWriter toMapped() {
+ final var newWriter = fileWriter.toMapped();
+ return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
+ }
- abstract @NonNull DiskJournalSegmentWriter toFileChannel();
+ @NonNull JournalSegmentWriter toFileChannel() {
+ final var newWriter = fileWriter.toDisk();
+ return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
+ }
}