atomic-storage: remove type dependency at segment level I/O
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / JournalSegmentWriter.java
index c7c035be65b06b187ea8f6f508a30be45812155f..c2e0a258c9c4a650b62c73b548a5dc49f61e938b 100644 (file)
@@ -18,8 +18,8 @@ package io.atomix.storage.journal;
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 import static java.util.Objects.requireNonNull;
 
-import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
+import io.netty.buffer.ByteBuf;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
@@ -29,37 +29,36 @@ import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
+abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
 
     final @NonNull FileChannel channel;
-    final @NonNull JournalSegment<E> segment;
+    final @NonNull JournalSegment segment;
     private final @NonNull JournalIndex index;
-    final @NonNull JournalSerdes namespace;
     final int maxSegmentSize;
     final int maxEntrySize;
 
-    private Indexed<E> lastEntry;
     private int currentPosition;
+    private Long lastIndex;
+    private ByteBuf lastWritten;
 
-    JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
-            final JournalIndex index, final JournalSerdes namespace) {
+    JournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+            final JournalIndex index) {
         this.channel = requireNonNull(channel);
         this.segment = requireNonNull(segment);
         this.index = requireNonNull(index);
-        this.namespace = requireNonNull(namespace);
         maxSegmentSize = segment.descriptor().maxSegmentSize();
         this.maxEntrySize = maxEntrySize;
     }
 
-    JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+    JournalSegmentWriter(final JournalSegmentWriter previous) {
         channel = previous.channel;
         segment = previous.segment;
         index = previous.index;
-        namespace = previous.namespace;
         maxSegmentSize = previous.maxSegmentSize;
         maxEntrySize = previous.maxEntrySize;
-        lastEntry = previous.lastEntry;
+        lastWritten = previous.lastWritten;
+        lastIndex = previous.lastIndex;
         currentPosition = previous.currentPosition;
     }
 
@@ -69,16 +68,16 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @return The last written index.
      */
     final long getLastIndex() {
-        return lastEntry != null ? lastEntry.index() : segment.firstIndex() - 1;
+        return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
     }
 
     /**
-     * Returns the last entry written.
+     * Returns the last data written.
      *
-     * @return The last entry written.
+     * @return The last data written.
      */
-    final Indexed<E> getLastEntry() {
-        return lastEntry;
+    final ByteBuf getLastWritten() {
+        return lastWritten == null ? null : lastWritten.slice();
     }
 
     /**
@@ -87,63 +86,52 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @return The next index to be written.
      */
     final long getNextIndex() {
-        return lastEntry != null ? lastEntry.index() + 1 : segment.firstIndex();
+        return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
     }
 
     /**
-     * Tries to append an entry to the journal.
+     * Tries to append a binary data to the journal.
      *
-     * @param entry The entry to append.
-     * @return The appended indexed entry, or {@code null} if there is not enough space available
+     * @param buf binary data to append
+     * @return The index of appended data, or {@code null} if segment has no space
      */
-    final <T extends E> @Nullable Indexed<T> append(final T entry) {
+    final Long append(final ByteBuf buf) {
+        final var length = buf.readableBytes();
+        if (length > maxEntrySize) {
+            throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
+                + maxEntrySize + ")");
+        }
+
         // Store the entry index.
         final long index = getNextIndex();
         final int position = currentPosition;
 
-        // Serialize the entry.
-        final int bodyPosition = position + HEADER_BYTES;
-        final int avail = maxSegmentSize - bodyPosition;
-        if (avail < 0) {
+        // check space available
+        final int nextPosition = position + HEADER_BYTES + length;
+        if (nextPosition >= maxSegmentSize) {
             LOG.trace("Not enough space for {} at {}", index, position);
             return null;
         }
 
-        final var writeLimit = Math.min(avail, maxEntrySize);
-        final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
-        try {
-            namespace.serialize(entry, diskEntry);
-        } catch (KryoException e) {
-            if (writeLimit != maxEntrySize) {
-                // We have not provided enough capacity, signal to roll to next segment
-                LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
-                return null;
-            }
-
-            // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
-            throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
-        }
-
-        final int length = diskEntry.position() - HEADER_BYTES;
+        // allocate buffer and write data
+        final var writeBuffer = startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
+        writeBuffer.put(buf.nioBuffer());
 
         // Compute the checksum for the entry.
         final var crc32 = new CRC32();
-        crc32.update(diskEntry.flip().position(HEADER_BYTES));
+        crc32.update(writeBuffer.flip().position(HEADER_BYTES));
 
         // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-        diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
-        commitWrite(position, diskEntry.rewind());
+        writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+        commitWrite(position, writeBuffer.rewind());
 
         // Update the last entry with the correct index/term/length.
-        final var indexedEntry = new Indexed<E>(index, entry, length);
-        lastEntry = indexedEntry;
+        currentPosition = nextPosition;
+        lastWritten = buf;
+        lastIndex = index;
         this.index.index(index, position);
 
-        currentPosition = bodyPosition + length;
-
-        @SuppressWarnings("unchecked")
-        final var ugly = (Indexed<T>) indexedEntry;
-        return ugly;
+        return index;
     }
 
     abstract ByteBuffer startWrite(int position, int size);
@@ -167,9 +155,9 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         }
     }
 
-    abstract JournalSegmentReader<E> reader();
+    abstract JournalSegmentReader reader();
 
-    private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
+    private void resetWithBuffer(final JournalSegmentReader reader, final long index) {
         long nextIndex = segment.firstIndex();
 
         // Clear the buffer indexes and acquire ownership of the buffer
@@ -177,17 +165,18 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         reader.setPosition(JournalSegmentDescriptor.BYTES);
 
         while (index == 0 || nextIndex <= index) {
-            final var entry = reader.readEntry(nextIndex);
-            if (entry == null) {
+            final var buf = reader.readBytes(nextIndex);
+            if (buf == null) {
                 break;
             }
 
-            lastEntry = entry;
+            lastWritten = buf;
+            lastIndex = nextIndex;
             this.index.index(nextIndex, currentPosition);
             nextIndex++;
 
             // Update the current position for indexing.
-            currentPosition = currentPosition + HEADER_BYTES + entry.size();
+            currentPosition += HEADER_BYTES + buf.readableBytes();
         }
     }
 
@@ -202,8 +191,9 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
             return;
         }
 
-        // Reset the last entry.
-        lastEntry = null;
+        // Reset the last written
+        lastIndex = null;
+        lastWritten = null;
 
         // Truncate the index.
         this.index.truncate(index);
@@ -245,7 +235,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      */
     abstract @Nullable MappedByteBuffer buffer();
 
-    abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
+    abstract @NonNull MappedJournalSegmentWriter toMapped();
 
-    abstract @NonNull DiskJournalSegmentWriter<E> toFileChannel();
+    abstract @NonNull DiskJournalSegmentWriter toFileChannel();
 }