Centralize JournalSegmentWriter.append()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskJournalSegmentWriter.java
index d3aa3332c7a56eddd71c4f36193398e386d78323..266320127b214056942380d9d5f9a73437242557 100644 (file)
@@ -18,15 +18,11 @@ package io.atomix.storage.journal;
 
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 
-import com.esotericsoftware.kryo.KryoException;
-import io.atomix.storage.journal.StorageException.TooLarge;
 import io.atomix.storage.journal.index.JournalIndex;
 import java.io.IOException;
-import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.zip.CRC32;
 
 /**
  * Segment writer.
@@ -44,117 +40,85 @@ import java.util.zip.CRC32;
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
-  private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
-
-  private final JournalSegmentReader<E> reader;
-  private final ByteBuffer buffer;
-
-  DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
-          final JournalIndex index, final JournalSerdes namespace) {
-    super(channel, segment, maxEntrySize, index, namespace);
-
-    buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
-    reader = new JournalSegmentReader<>(segment,
-        new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
-    reset(0);
-  }
-
-  DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
-    super(previous);
-
-    buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
-    reader = new JournalSegmentReader<>(segment,
-        new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
-  }
-
-  @Override
-  MappedByteBuffer buffer() {
-    return null;
-  }
-
-  @Override
-  MappedJournalSegmentWriter<E> toMapped() {
-    return new MappedJournalSegmentWriter<>(this);
-  }
-
-  @Override
-  DiskJournalSegmentWriter<E> toFileChannel() {
-    return this;
-  }
-
-  @Override
-  JournalSegmentReader<E> reader() {
-    return reader;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  <T extends E> Indexed<T> append(final T entry) {
-      // Store the entry index.
-      final long index = getNextIndex();
-
-      // Serialize the entry.
-      try {
-          namespace.serialize(entry, buffer.clear().position(HEADER_BYTES));
-      } catch (KryoException e) {
-          throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
-      }
-      buffer.flip();
-
-      final int length = buffer.limit() - HEADER_BYTES;
-      // Ensure there's enough space left in the buffer to store the entry.
-      if (maxSegmentSize - currentPosition < length + HEADER_BYTES) {
-          throw new BufferOverflowException();
-      }
-
-      // If the entry length exceeds the maximum entry size then throw an exception.
-      if (length > maxEntrySize) {
-          throw new TooLarge("Entry size " + length + " exceeds maximum allowed bytes (" + maxEntrySize + ")");
-      }
-
-      // Compute the checksum for the entry.
-      final var crc32 = new CRC32();
-      crc32.update(buffer.slice(HEADER_BYTES, length));
-
-      // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-      buffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
-      try {
-          channel.write(buffer, currentPosition);
-      } catch (IOException e) {
-          throw new StorageException(e);
-      }
-
-      // Update the last entry with the correct index/term/length.
-      final var indexedEntry = new Indexed<E>(index, entry, length);
-      lastEntry = indexedEntry;
-      this.index.index(index, currentPosition);
-
-      currentPosition = currentPosition + HEADER_BYTES + length;
-      return (Indexed<T>) indexedEntry;
-  }
-
-  @Override
-  void writeEmptyHeader(final int position) {
-    try {
-      channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
-    } catch (IOException e) {
-      throw new StorageException(e);
+    private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
+
+    private final JournalSegmentReader<E> reader;
+    private final ByteBuffer buffer;
+
+    DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
+        final JournalIndex index, final JournalSerdes namespace) {
+        super(channel, segment, maxEntrySize, index, namespace);
+
+        buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+        reader = new JournalSegmentReader<>(segment,
+            new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+        reset(0);
+    }
+
+    DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+        super(previous);
+
+        buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
+        reader = new JournalSegmentReader<>(segment,
+            new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+    }
+
+    @Override
+    MappedByteBuffer buffer() {
+        return null;
+    }
+
+    @Override
+    MappedJournalSegmentWriter<E> toMapped() {
+        return new MappedJournalSegmentWriter<>(this);
+    }
+
+    @Override
+    DiskJournalSegmentWriter<E> toFileChannel() {
+        return this;
+    }
+
+    @Override
+    JournalSegmentReader<E> reader() {
+        return reader;
     }
-  }
-
-  @Override
-  void flush() {
-    try {
-      if (channel.isOpen()) {
-        channel.force(true);
-      }
-    } catch (IOException e) {
-      throw new StorageException(e);
+
+    @Override
+    ByteBuffer startWrite(final int position, final int size) {
+        return buffer.clear().slice(0, size);
+    }
+
+    @Override
+    void commitWrite(final int position, final ByteBuffer entry) {
+        try {
+            channel.write(entry, position);
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
     }
-  }
 
-  @Override
-  void close() {
-    flush();
-  }
+    @Override
+    void writeEmptyHeader(final int position) {
+        try {
+            channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    void flush() {
+        try {
+            if (channel.isOpen()) {
+                channel.force(true);
+            }
+        } catch (IOException e) {
+            throw new StorageException(e);
+        }
+    }
+
+    @Override
+    void close() {
+        flush();
+    }
 }