Introduce SegmentEntry
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskJournalSegmentWriter.java
index 62c5b4c7678cfc90e54d290628291804eb7580f1..cbbb7799162f135206b9a9d8e870ce88b7e4bb1d 100644 (file)
@@ -21,12 +21,14 @@ import com.google.common.annotations.VisibleForTesting;
 import io.atomix.storage.journal.index.JournalIndex;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.util.zip.CRC32;
+import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Segment writer.
@@ -44,6 +46,7 @@ import java.util.zip.CRC32;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
+  private static final Logger LOG = LoggerFactory.getLogger(DiskJournalSegmentWriter.class);
   private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[ENTRY_HEADER_BYTES]);
 
   private final ByteBuffer memory;
@@ -90,60 +93,48 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
   }
 
   @Override
-  void reset(long index) {
-    long nextIndex = firstIndex;
-
-    // Clear the buffer indexes.
-    currentPosition = JournalSegmentDescriptor.BYTES;
-
-    try {
-      // Clear memory buffer and read fist chunk
-      channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
-      memory.flip();
-
-      // Read the entry length.
-      int length = memory.getInt();
-
-      // If the length is non-zero, read the entry.
-      while (0 < length && length <= maxEntrySize && (index == 0 || nextIndex <= index)) {
-
-        // Read the checksum of the entry.
-        final long checksum = memory.getInt() & 0xFFFFFFFFL;
-
-        // Slice off the entry's bytes
-        final ByteBuffer entryBytes = memory.slice();
-        entryBytes.limit(length);
-
-        // Compute the checksum for the entry bytes.
-        final CRC32 crc32 = new CRC32();
-        crc32.update(entryBytes);
-
-        // If the stored checksum does not equal the computed checksum, do not proceed further
-        if (checksum != crc32.getValue()) {
-          break;
-        }
-
-        entryBytes.rewind();
-        final E entry = namespace.deserialize(entryBytes);
-        lastEntry = new Indexed<>(nextIndex, entry, length);
-        this.index.index(nextIndex, (int) currentPosition);
-        nextIndex++;
-
-        // Update the current position for indexing.
-        currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
-        memory.position(memory.position() + length);
-
-        length = prepareNextEntry(channel, memory);
+  void reset(final long index) {
+      long nextIndex = firstIndex;
+
+      // Clear the buffer indexes.
+      currentPosition = JournalSegmentDescriptor.BYTES;
+
+      try {
+          // Clear memory buffer and read fist chunk
+          channel.read(memory.clear(), JournalSegmentDescriptor.BYTES);
+          memory.flip();
+
+          while (index == 0 || nextIndex <= index) {
+              final var entry = prepareNextEntry(channel, memory, maxEntrySize);
+              if (entry == null) {
+                  break;
+              }
+
+              final var bytes = entry.bytes();
+              final var length = bytes.remaining();
+              try {
+                  lastEntry = new Indexed<>(nextIndex, namespace.<E>deserialize(bytes), length);
+              } catch (KryoException e) {
+                  // No-op, position is only updated on success
+                  LOG.debug("Failed to deserialize entry", e);
+                  break;
+              }
+
+              this.index.index(nextIndex, (int) currentPosition);
+              nextIndex++;
+
+              // Update the current position for indexing.
+              currentPosition = currentPosition + ENTRY_HEADER_BYTES + length;
+              memory.position(memory.position() + length);
+          }
+      } catch (IOException e) {
+          throw new StorageException(e);
       }
-    } catch (BufferUnderflowException e) {
-      // No-op, position is only updated on success
-    } catch (IOException e) {
-      throw new StorageException(e);
-    }
   }
 
   @VisibleForTesting
-  static int prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory) throws IOException {
+  static @Nullable SegmentEntry prepareNextEntry(final SeekableByteChannel channel, final ByteBuffer memory,
+          final int maxEntrySize) throws IOException {
       int remaining = memory.remaining();
       boolean compacted;
       if (remaining < ENTRY_HEADER_BYTES) {
@@ -152,25 +143,32 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
           remaining = memory.flip().remaining();
           if (remaining < ENTRY_HEADER_BYTES) {
               // could happen with mis-padded segment
-              return 0;
+              return null;
           }
           compacted = true;
       } else {
           compacted = false;
       }
 
+      int length;
       while (true) {
-          final int length = memory.mark().getInt();
+          length = memory.mark().getInt();
+          if (length < 1 || length > maxEntrySize) {
+              // Invalid length,
+              memory.reset();
+              return null;
+          }
+
           if (remaining >= Integer.BYTES + length) {
               // Fast path: we have the entry properly positioned
-              return length;
+              break;
           }
 
           // Not enough data for entry, to header start
           memory.reset();
           if (compacted) {
               // we have already compacted the buffer, there is just not enough data
-              return 0;
+              return null;
           }
 
           // Try to read more data and check again
@@ -178,6 +176,27 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
           remaining = memory.flip().remaining();
           compacted = true;
       }
+
+      // Read the checksum of the entry.
+      final int checksum = memory.getInt();
+
+      // Slice off the entry's bytes
+      final var entryBytes = memory.slice();
+      entryBytes.limit(length);
+
+      // Compute the checksum for the entry bytes.
+      final var crc32 = new CRC32();
+      crc32.update(entryBytes);
+
+      // If the stored checksum does not equal the computed checksum, do not proceed further
+      final var computed = (int) crc32.getValue();
+      if (checksum != computed) {
+          LOG.warn("Expected checksum {}, computed {}", Integer.toHexString(checksum), Integer.toHexString(computed));
+          memory.reset();
+          return null;
+      }
+
+      return new SegmentEntry(checksum, entryBytes.rewind());
   }
 
   @Override