Import atomix/{storage,utils}
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / FileChannelJournalSegmentReader.java
diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentReader.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentReader.java
new file mode 100644 (file)
index 0000000..fe69711
--- /dev/null
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import io.atomix.storage.StorageException;
+import io.atomix.storage.journal.index.JournalIndex;
+import io.atomix.storage.journal.index.Position;
+import io.atomix.utils.serializer.Namespace;
+
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.NoSuchElementException;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+/**
+ * Log segment reader.
+ *
+ * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
+ */
+class FileChannelJournalSegmentReader<E> implements JournalReader<E> {
+  private final FileChannel channel;
+  private final int maxEntrySize;
+  private final JournalIndex index;
+  private final Namespace namespace;
+  private final ByteBuffer memory;
+  private final long firstIndex;
+  private Indexed<E> currentEntry;
+  private Indexed<E> nextEntry;
+
+  FileChannelJournalSegmentReader(
+      FileChannel channel,
+      JournalSegment<E> segment,
+      int maxEntrySize,
+      JournalIndex index,
+      Namespace namespace) {
+    this.channel = channel;
+    this.maxEntrySize = maxEntrySize;
+    this.index = index;
+    this.namespace = namespace;
+    this.memory = ByteBuffer.allocate((maxEntrySize + Integer.BYTES + Integer.BYTES) * 2);
+    this.firstIndex = segment.index();
+    reset();
+  }
+
+  @Override
+  public long getFirstIndex() {
+    return firstIndex;
+  }
+
+  @Override
+  public long getCurrentIndex() {
+    return currentEntry != null ? currentEntry.index() : 0;
+  }
+
+  @Override
+  public Indexed<E> getCurrentEntry() {
+    return currentEntry;
+  }
+
+  @Override
+  public long getNextIndex() {
+    return currentEntry != null ? currentEntry.index() + 1 : firstIndex;
+  }
+
+  @Override
+  public void reset(long index) {
+    reset();
+    Position position = this.index.lookup(index - 1);
+    if (position != null) {
+      currentEntry = new Indexed<>(position.index() - 1, null, 0);
+      try {
+        channel.position(position.position());
+        memory.clear().flip();
+      } catch (IOException e) {
+        throw new StorageException(e);
+      }
+      readNext();
+    }
+    while (getNextIndex() < index && hasNext()) {
+      next();
+    }
+  }
+
+  @Override
+  public void reset() {
+    try {
+      channel.position(JournalSegmentDescriptor.BYTES);
+    } catch (IOException e) {
+      throw new StorageException(e);
+    }
+    memory.clear().limit(0);
+    currentEntry = null;
+    nextEntry = null;
+    readNext();
+  }
+
+  @Override
+  public boolean hasNext() {
+    // If the next entry is null, check whether a next entry exists.
+    if (nextEntry == null) {
+      readNext();
+    }
+    return nextEntry != null;
+  }
+
+  @Override
+  public Indexed<E> next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    // Set the current entry to the next entry.
+    currentEntry = nextEntry;
+
+    // Reset the next entry to null.
+    nextEntry = null;
+
+    // Read the next entry in the segment.
+    readNext();
+
+    // Return the current entry.
+    return currentEntry;
+  }
+
+  /**
+   * Reads the next entry in the segment.
+   */
+  @SuppressWarnings("unchecked")
+  private void readNext() {
+    // Compute the index of the next entry in the segment.
+    final long index = getNextIndex();
+
+    try {
+      // Read more bytes from the segment if necessary.
+      if (memory.remaining() < maxEntrySize) {
+        long position = channel.position() + memory.position();
+        channel.position(position);
+        memory.clear();
+        channel.read(memory);
+        channel.position(position);
+        memory.flip();
+      }
+
+      // Mark the buffer so it can be reset if necessary.
+      memory.mark();
+
+      try {
+        // Read the length of the entry.
+        final int length = memory.getInt();
+
+        // If the buffer length is zero then return.
+        if (length <= 0 || length > maxEntrySize) {
+          memory.reset().limit(memory.position());
+          nextEntry = null;
+          return;
+        }
+
+        // Read the checksum of the entry.
+        long checksum = memory.getInt() & 0xFFFFFFFFL;
+
+        // Compute the checksum for the entry bytes.
+        final Checksum crc32 = new CRC32();
+        crc32.update(memory.array(), memory.position(), length);
+
+        // If the stored checksum equals the computed checksum, return the entry.
+        if (checksum == crc32.getValue()) {
+          int limit = memory.limit();
+          memory.limit(memory.position() + length);
+          E entry = namespace.deserialize(memory);
+          memory.limit(limit);
+          nextEntry = new Indexed<>(index, entry, length);
+        } else {
+          memory.reset().limit(memory.position());
+          nextEntry = null;
+        }
+      } catch (BufferUnderflowException e) {
+        memory.reset().limit(memory.position());
+        nextEntry = null;
+      }
+    } catch (IOException e) {
+      throw new StorageException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    // Do nothing. The parent reader manages the channel.
+  }
+}