Improve disk entry access
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskJournalSegmentWriter.java
index 3f6371781a4061caf2b0fcb7b43d21e26036aaff..62c5b4c7678cfc90e54d290628291804eb7580f1 100644 (file)
 package io.atomix.storage.journal;
 
 import com.esotericsoftware.kryo.KryoException;
+import com.google.common.annotations.VisibleForTesting;
 import io.atomix.storage.journal.index.JournalIndex;
-
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
 import java.util.zip.CRC32;
 
 /**
@@ -132,13 +133,7 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
         currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
         memory.position(memory.position() + length);
 
-        // Read more bytes from the segment if necessary.
-        if (memory.remaining() < maxEntrySize) {
-          channel.read(memory.compact());
-          memory.flip();
-        }
-
-        length = memory.getInt();
+        length = prepareNextEntry(channel, memory);
       }
     } catch (BufferUnderflowException e) {
       // No-op, position is only updated on success
@@ -147,6 +142,44 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     }
   }
 
+  @VisibleForTesting
+  static int prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory) throws IOException {
+      int remaining = memory.remaining();
+      boolean compacted;
+      if (remaining < ENTRY_HEADER_BYTES) {
+          // We do not have the header available. Move the pointer and read.
+          channel.read(memory.compact());
+          remaining = memory.flip().remaining();
+          if (remaining < ENTRY_HEADER_BYTES) {
+              // could happen with mis-padded segment
+              return 0;
+          }
+          compacted = true;
+      } else {
+          compacted = false;
+      }
+
+      while (true) {
+          final int length = memory.mark().getInt();
+          if (remaining >= Integer.BYTES + length) {
+              // Fast path: we have the entry properly positioned
+              return length;
+          }
+
+          // Not enough data for entry, to header start
+          memory.reset();
+          if (compacted) {
+              // we have already compacted the buffer, there is just not enough data
+              return 0;
+          }
+
+          // Try to read more data and check again
+          channel.read(memory.compact());
+          remaining = memory.flip().remaining();
+          compacted = true;
+      }
+  }
+
   @Override
   Indexed<E> getLastEntry() {
     return lastEntry;