Unify Disk segment reading
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskJournalSegmentWriter.java
index c46b55cac8ecf4bac6098647b3964b696b8d3ba6..e51de40687c9723ff74dc3340975f2260875003a 100644 (file)
@@ -19,18 +19,14 @@ package io.atomix.storage.journal;
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 
 import com.esotericsoftware.kryo.KryoException;
-import com.google.common.annotations.VisibleForTesting;
+import io.atomix.storage.journal.StorageException.TooLarge;
 import io.atomix.storage.journal.index.JournalIndex;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.channels.SeekableByteChannel;
 import java.util.zip.CRC32;
-import org.eclipse.jdt.annotation.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Segment writer.
@@ -48,37 +44,34 @@ import org.slf4j.LoggerFactory;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
-  private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
 
-  private final ByteBuffer memory;
+  private final JournalSegmentReader<E> reader;
+  private final ByteBuffer buffer;
+
   private Indexed<E> lastEntry;
   private long currentPosition;
 
-  DiskJournalSegmentWriter(
-      FileChannel channel,
-      JournalSegment<E> segment,
-      int maxEntrySize,
-      JournalIndex index,
-      JournalSerdes namespace) {
+  DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+          final JournalIndex index, final JournalSerdes namespace) {
     super(channel, segment, maxEntrySize, index, namespace);
-    memory = allocMemory(maxEntrySize);
+
+    buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+    final var fileReader = new DiskFileReader(segment.file().file().toPath(), channel, maxSegmentSize, maxEntrySize);
+    reader = new JournalSegmentReader<>(segment, fileReader, maxEntrySize, namespace);
     reset(0);
   }
 
-  DiskJournalSegmentWriter(JournalSegmentWriter<E> previous, int position) {
+  DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous, final int position) {
     super(previous);
-    memory = allocMemory(maxEntrySize);
+
+    buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+    final var fileReader = new DiskFileReader(segment.file().file().toPath(), channel, maxSegmentSize, maxEntrySize);
+    reader = new JournalSegmentReader<>(segment, fileReader, maxEntrySize, namespace);
     lastEntry = previous.getLastEntry();
     currentPosition = position;
   }
 
-  private static ByteBuffer allocMemory(int maxEntrySize) {
-    final var buf = ByteBuffer.allocate((maxEntrySize + HEADER_BYTES) * 2);
-    buf.limit(0);
-    return buf;
-  }
-
   @Override
   MappedByteBuffer buffer() {
     return null;
@@ -96,109 +89,36 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
   @Override
   void reset(final long index) {
-      long nextIndex = firstIndex;
-
-      // Clear the buffer indexes.
-      currentPosition = JournalSegmentDescriptor.BYTES;
-
+      // acquire ownership of cache and make sure reader does not see anything we've done once we're done
+      reader.invalidateCache();
       try {
-          // Clear memory buffer and read fist chunk
-          channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
-          memory.flip();
-
-          while (index == 0 || nextIndex <= index) {
-              final var entry = prepareNextEntry(channel, memory, maxEntrySize);
-              if (entry == null) {
-                  break;
-              }
-
-              final var bytes = entry.bytes();
-              final var length = bytes.remaining();
-              try {
-                  lastEntry = new Indexed<>(nextIndex, namespace.<E>deserialize(bytes), length);
-              } catch (KryoException e) {
-                  // No-op, position is only updated on success
-                  LOG.debug("Failed to deserialize entry", e);
-                  break;
-              }
-
-              this.index.index(nextIndex, (int) currentPosition);
-              nextIndex++;
-
-              // Update the current position for indexing.
-              currentPosition = currentPosition + HEADER_BYTES + length;
-              memory.position(memory.position() + length);
-          }
-      } catch (IOException e) {
-          throw new StorageException(e);
+          resetWithBuffer(index);
+      } finally {
+          // Make sure reader does not see anything we've done
+          reader.invalidateCache();
       }
   }
 
-  @VisibleForTesting
-  static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory,
-          final int maxEntrySize) throws IOException {
-      int remaining = memory.remaining();
-      boolean compacted;
-      if (remaining < HEADER_BYTES) {
-          // We do not have the header available. Move the pointer and read.
-          channel.read(memory.compact());
-          remaining = memory.flip().remaining();
-          if (remaining < HEADER_BYTES) {
-              // could happen with mis-padded segment
-              return null;
-          }
-          compacted = true;
-      } else {
-          compacted = false;
-      }
+  private void resetWithBuffer(final long index) {
+      long nextIndex = firstIndex;
 
-      int length;
-      while (true) {
-          length = memory.mark().getInt();
-          if (length < 1 || length > maxEntrySize) {
-              // Invalid length,
-              memory.reset();
-              return null;
-          }
+      // Clear the buffer indexes and acquire ownership of the buffer
+      currentPosition = JournalSegmentDescriptor.BYTES;
+      reader.setPosition(JournalSegmentDescriptor.BYTES);
 
-          if (remaining >= Integer.BYTES + length) {
-              // Fast path: we have the entry properly positioned
+      while (index == 0 || nextIndex <= index) {
+          final var entry = reader.readEntry(nextIndex);
+          if (entry == null) {
               break;
           }
 
-          // Not enough data for entry, to header start
-          memory.reset();
-          if (compacted) {
-              // we have already compacted the buffer, there is just not enough data
-              return null;
-          }
+          lastEntry = entry;
+          this.index.index(nextIndex, (int) currentPosition);
+          nextIndex++;
 
-          // Try to read more data and check again
-          channel.read(memory.compact());
-          remaining = memory.flip().remaining();
-          compacted = true;
+          // Update the current position for indexing.
+          currentPosition = currentPosition + HEADER_BYTES + entry.size();
       }
-
-      // Read the checksum of the entry.
-      final int checksum = memory.getInt();
-
-      // Slice off the entry's bytes
-      final var entryBytes = memory.slice();
-      entryBytes.limit(length);
-
-      // Compute the checksum for the entry bytes.
-      final var crc32 = new CRC32();
-      crc32.update(entryBytes);
-
-      // If the stored checksum does not equal the computed checksum, do not proceed further
-      final var computed = (int) crc32.getValue();
-      if (checksum != computed) {
-          LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
-          memory.reset();
-          return null;
-      }
-
-      return new SegmentEntry(checksum, entryBytes.rewind());
   }
 
   @Override
@@ -208,54 +128,52 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
 
   @Override
   @SuppressWarnings("unchecked")
-  <T extends E> Indexed<T> append(T entry) {
-    // Store the entry index.
-    final long index = getNextIndex();
-
-    // Serialize the entry.
-    try {
-      namespace.serialize(entry, memory.clear().position(HEADER_BYTES));
-    } catch (KryoException e) {
-      throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
-    }
-    memory.flip();
+  <T extends E> Indexed<T> append(final T entry) {
+      // Store the entry index.
+      final long index = getNextIndex();
 
-    final int length = memory.limit() - HEADER_BYTES;
+      // Serialize the entry.
+      try {
+          namespace.serialize(entry, buffer.clear().position(HEADER_BYTES));
+      } catch (KryoException e) {
+          throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+      }
+      buffer.flip();
 
-    // Ensure there's enough space left in the buffer to store the entry.
-    if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
-      throw new BufferOverflowException();
-    }
+      final int length = buffer.limit() - HEADER_BYTES;
+      // Ensure there's enough space left in the buffer to store the entry.
+      if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
+          throw new BufferOverflowException();
+      }
 
-    // If the entry length exceeds the maximum entry size then throw an exception.
-    if (length > maxEntrySize) {
-      throw new StorageException.TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
-    }
+      // If the entry length exceeds the maximum entry size then throw an exception.
+      if (length > maxEntrySize) {
+          throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
+      }
 
-    // Compute the checksum for the entry.
-    final CRC32 crc32 = new CRC32();
-    crc32.update(memory.array(), HEADER_BYTES, memory.limit() - HEADER_BYTES);
-    final long checksum = crc32.getValue();
+      // Compute the checksum for the entry.
+      final var crc32 = new CRC32();
+      crc32.update(buffer.slice(HEADER_BYTES, length));
 
-    // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-    memory.putInt(0, length).putInt(Integer.BYTES, (int) checksum);
-    try {
-      channel.write(memory, currentPosition);
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
+      // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
+      buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+      try {
+          channel.write(buffer, currentPosition);
+      } catch (IOException e) {
+          throw new StorageException(e);
+      }
 
-    // Update the last entry with the correct index/term/length.
-    Indexed<E> indexedEntry = new Indexed<>(index, entry, length);
-    this.lastEntry = indexedEntry;
-    this.index.index(index, (int) currentPosition);
+      // Update the last entry with the correct index/term/length.
+      final var indexedEntry = new Indexed<E>(index, entry, length);
+      lastEntry = indexedEntry;
+      this.index.index(index, (int) currentPosition);
 
-    currentPosition = currentPosition + HEADER_BYTES + length;
-    return (Indexed<T>) indexedEntry;
+      currentPosition = currentPosition + HEADER_BYTES + length;
+      return (Indexed<T>) indexedEntry;
   }
 
   @Override
-  void truncate(long index) {
+  void truncate(final long index) {
     // If the index is greater than or equal to the last index, skip the truncate.
     if (index >= getLastIndex()) {
       return;