Remove StorageLevel.MEMORY
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / FileChannelJournalSegmentWriter.java
index fa8b02bded1cdabff58370cf66e12aa12a9318da..4d3aa783637828ef189c5af5ec54e51d5698b0f3 100644 (file)
@@ -44,7 +44,6 @@ import java.util.zip.CRC32;
 final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[Integer.BYTES + Integer.BYTES]);
 
-  private final FileChannel channel;
   private final ByteBuffer memory;
   private Indexed<E> lastEntry;
   private long currentPosition;
@@ -55,20 +54,41 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
       int maxEntrySize,
       JournalIndex index,
       JournalSerdes namespace) {
-    super(segment, maxEntrySize, index, namespace);
-    this.channel = channel;
-    this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
-    memory.limit(0);
+    super(channel, segment, maxEntrySize, index, namespace);
+    memory = allocMemory(maxEntrySize);
     reset(0);
   }
 
+  FileChannelJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
+    super(previous);
+    memory = allocMemory(maxEntrySize);
+    lastEntry = previous.getLastEntry();
+    currentPosition = position;
+  }
+
+  private static ByteBuffer allocMemory(int maxEntrySize) {
+    final var buf = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
+    buf.limit(0);
+    return buf;
+  }
+
   @Override
   MappedByteBuffer buffer() {
     return null;
   }
 
   @Override
-  public void reset(long index) {
+  MappedJournalSegmentWriter<E> toMapped() {
+    return new MappedJournalSegmentWriter<>(this, (int) currentPosition);
+  }
+
+  @Override
+  FileChannelJournalSegmentWriter<E> toFileChannel() {
+    return this;
+  }
+
+  @Override
+  void reset(long index) {
     long nextIndex = firstIndex;
 
     // Clear the buffer indexes.
@@ -129,17 +149,17 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   @Override
-  public long getLastIndex() {
-    return lastEntry != null ? lastEntry.index() : segment.index() - 1;
+  long getLastIndex() {
+    return lastEntry != null ? lastEntry.index() : firstIndex - 1;
   }
 
   @Override
-  public Indexed<E> getLastEntry() {
+  Indexed<E> getLastEntry() {
     return lastEntry;
   }
 
   @Override
-  public long getNextIndex() {
+  long getNextIndex() {
     if (lastEntry != null) {
       return lastEntry.index() + 1;
     } else {
@@ -147,25 +167,9 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     }
   }
 
-  @Override
-  public void append(Indexed<E> entry) {
-    final long nextIndex = getNextIndex();
-
-    // If the entry's index is greater than the next index in the segment, skip some entries.
-    if (entry.index() > nextIndex) {
-      throw new IndexOutOfBoundsException("Entry index is not sequential");
-    }
-
-    // If the entry's index is less than the next index, truncate the segment.
-    if (entry.index() < nextIndex) {
-      truncate(entry.index() - 1);
-    }
-    append(entry.entry());
-  }
-
   @Override
   @SuppressWarnings("unchecked")
-  public <T extends E> Indexed<T> append(T entry) {
+  <T extends E> Indexed<T> append(T entry) {
     // Store the entry index.
     final long index = getNextIndex();
 
@@ -182,7 +186,7 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     final int length = memory.limit() - (Integer.BYTES + Integer.BYTES);
 
     // Ensure there's enough space left in the buffer to store the entry.
-    if (segment.descriptor().maxSegmentSize() - currentPosition < length + Integer.BYTES + Integer.BYTES) {
+    if (maxSegmentSize - currentPosition < length + Integer.BYTES + Integer.BYTES) {
       throw new BufferOverflowException();
     }
 
@@ -215,7 +219,7 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   @Override
-  public void truncate(long index) {
+  void truncate(long index) {
     // If the index is greater than or equal to the last index, skip the truncate.
     if (index >= getLastIndex()) {
       return;
@@ -228,7 +232,7 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     this.index.truncate(index);
 
     try {
-      if (index < segment.index()) {
+      if (index < firstIndex) {
         // Reset the writer to the first entry.
         currentPosition = JournalSegmentDescriptor.BYTES;
       } else {
@@ -244,7 +248,7 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   @Override
-  public void flush() {
+  void flush() {
     try {
       if (channel.isOpen()) {
         channel.force(true);
@@ -255,7 +259,7 @@ final class FileChannelJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   @Override
-  public void close() {
+  void close() {
     flush();
   }
 }