Factor out FileWriter interface
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
index c2e0a258c9c4a650b62c73b548a5dc49f61e938b..f00e0daddd542aa42d0db489b075f85d514dc198 100644 (file)
@@ -20,19 +20,17 @@ import static java.util.Objects.requireNonNull;
 
 import io.atomix.storage.journal.index.JournalIndex;
 import io.netty.buffer.ByteBuf;
-import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
+final class JournalSegmentWriter {
     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
 
-    final @NonNull FileChannel channel;
+    private final FileWriter fileWriter;
     final @NonNull JournalSegment segment;
     private final @NonNull JournalIndex index;
     final int maxSegmentSize;
@@ -42,17 +40,18 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map
     private Long lastIndex;
     private ByteBuf lastWritten;
 
-    JournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+    JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
             final JournalIndex index) {
-        this.channel = requireNonNull(channel);
+        this.fileWriter = requireNonNull(fileWriter);
         this.segment = requireNonNull(segment);
         this.index = requireNonNull(index);
         maxSegmentSize = segment.descriptor().maxSegmentSize();
         this.maxEntrySize = maxEntrySize;
+        // adjust lastEntry value
+        reset(0);
     }
 
-    JournalSegmentWriter(final JournalSegmentWriter previous) {
-        channel = previous.channel;
+    JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
         segment = previous.segment;
         index = previous.index;
         maxSegmentSize = previous.maxSegmentSize;
@@ -60,6 +59,7 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map
         lastWritten = previous.lastWritten;
         lastIndex = previous.lastIndex;
         currentPosition = previous.currentPosition;
+        this.fileWriter = requireNonNull(fileWriter);
     }
 
     /**
@@ -114,7 +114,7 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map
         }
 
         // allocate buffer and write data
-        final var writeBuffer = startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
+        final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
         writeBuffer.put(buf.nioBuffer());
 
         // Compute the checksum for the entry.
@@ -123,7 +123,7 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map
 
         // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
         writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
-        commitWrite(position, writeBuffer.rewind());
+        fileWriter.commitWrite(position, writeBuffer.rewind());
 
         // Update the last entry with the correct index/term/length.
         currentPosition = nextPosition;
@@ -134,10 +134,6 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map
         return index;
     }
 
-    abstract ByteBuffer startWrite(int position, int size);
-
-    abstract void commitWrite(int position, ByteBuffer entry);
-
     /**
      * Resets the head of the segment to the given index.
      *
@@ -145,23 +141,21 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map
      */
     final void reset(final long index) {
         // acquire ownership of cache and make sure reader does not see anything we've done once we're done
-        final var reader = reader();
-        reader.invalidateCache();
+        final var fileReader = fileWriter.reader();
         try {
-            resetWithBuffer(reader, index);
+            resetWithBuffer(fileReader, index);
         } finally {
             // Make sure reader does not see anything we've done
-            reader.invalidateCache();
+            fileReader.invalidateCache();
         }
     }
 
-    abstract JournalSegmentReader reader();
-
-    private void resetWithBuffer(final JournalSegmentReader reader, final long index) {
+    private void resetWithBuffer(final FileReader fileReader, final long index) {
         long nextIndex = segment.firstIndex();
 
         // Clear the buffer indexes and acquire ownership of the buffer
         currentPosition = JournalSegmentDescriptor.BYTES;
+        final var reader = new JournalSegmentReader(segment, fileReader, maxEntrySize);
         reader.setPosition(JournalSegmentDescriptor.BYTES);
 
         while (index == 0 || nextIndex <= index) {
@@ -207,25 +201,22 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map
         }
 
         // Zero the entry header at current channel position.
-        writeEmptyHeader(currentPosition);
+        fileWriter.writeEmptyHeader(currentPosition);
     }
 
-    /**
-     * Write {@link SegmentEntry#HEADER_BYTES} worth of zeroes at specified position.
-     *
-     * @param position position to write to
-     */
-    abstract void writeEmptyHeader(int position);
-
     /**
      * Flushes written entries to disk.
      */
-    abstract void flush();
+    void flush() {
+        fileWriter.flush();
+    }
 
     /**
      * Closes this writer.
      */
-    abstract void close();
+    void close() {
+        fileWriter.close();
+    }
 
     /**
      * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
@@ -233,9 +224,17 @@ abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, Map
      *
      * @return the mapped buffer underlying the segment writer, or {@code null}.
      */
-    abstract @Nullable MappedByteBuffer buffer();
+    @Nullable MappedByteBuffer buffer() {
+        return fileWriter.buffer();
+    }
 
-    abstract @NonNull MappedJournalSegmentWriter toMapped();
+    @NonNull JournalSegmentWriter toMapped() {
+        final var newWriter = fileWriter.toMapped();
+        return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
+    }
 
-    abstract @NonNull DiskJournalSegmentWriter toFileChannel();
+    @NonNull JournalSegmentWriter toFileChannel() {
+        final var newWriter = fileWriter.toDisk();
+        return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
+    }
 }