atomic-storage: remove type dependency at segment level I/O 17/111217/3
authorRuslan Kashapov <ruslan.kashapov@pantheon.tech>
Wed, 3 Apr 2024 09:53:20 +0000 (12:53 +0300)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 18 Apr 2024 15:50:19 +0000 (17:50 +0200)
Due to object de- and serialization was served by segment
I/O artifacts it required these artifacts holding serializer
instance and being dependent on its type (generics).

Moving serialization to upper level (journal) allows segment
reader/writer operate on byte level only with no extra data
conversions. Serializer interface simplified for further
isolation and removal of Kryo serializer.

JIRA: CONTROLLER-2115
Change-Id: I8f6bac3192a0f38b627150be4c8ea128f1e233e5
Signed-off-by: Ruslan Kashapov <ruslan.kashapov@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
14 files changed:
atomix-storage/pom.xml
atomix-storage/src/main/java/io/atomix/storage/journal/CommitsSegmentJournalReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/DiskJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegment.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java
features/odl-mdsal-clustering-commons/pom.xml
features/odl-mdsal-clustering-commons/src/main/feature/feature.xml

index 886ad6ac50f2cedc4af4cdda9581e0d40453b0e4..8fc7ca37090ee504c12846540ea53d895870cafb 100644 (file)
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.eclipse.jdt</groupId>
       <artifactId>org.eclipse.jdt.annotation</artifactId>
index ac80fed96b75f44a7abb21d36275a5d371eef3b3..65f4de64965498943134a4098d291205f5ed2ae4 100644 (file)
@@ -19,7 +19,7 @@ package io.atomix.storage.journal;
  * A {@link JournalReader} traversing only committed entries.
  */
 final class CommitsSegmentJournalReader<E> extends SegmentedJournalReader<E> {
-    CommitsSegmentJournalReader(SegmentedJournal<E> journal, JournalSegment<E> segment) {
+    CommitsSegmentJournalReader(SegmentedJournal<E> journal, JournalSegment segment) {
         super(journal, segment);
     }
 
index 266320127b214056942380d9d5f9a73437242557..54feee1adaacd07680c8b2b569bb71bb4f4ccbf5 100644 (file)
@@ -39,28 +39,28 @@ import java.nio.channels.FileChannel;
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
-final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
+final class DiskJournalSegmentWriter extends JournalSegmentWriter {
     private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
 
-    private final JournalSegmentReader<E> reader;
+    private final JournalSegmentReader reader;
     private final ByteBuffer buffer;
 
-    DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
-        final JournalIndex index, final JournalSerdes namespace) {
-        super(channel, segment, maxEntrySize, index, namespace);
+    DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+            final JournalIndex index) {
+        super(channel, segment, maxEntrySize, index);
 
         buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
-        reader = new JournalSegmentReader<>(segment,
-            new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+        reader = new JournalSegmentReader(segment,
+            new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize);
         reset(0);
     }
 
-    DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+    DiskJournalSegmentWriter(final JournalSegmentWriter previous) {
         super(previous);
 
         buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
-        reader = new JournalSegmentReader<>(segment,
-            new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
+        reader = new JournalSegmentReader(segment,
+            new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize);
     }
 
     @Override
@@ -69,17 +69,17 @@ final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     }
 
     @Override
-    MappedJournalSegmentWriter<E> toMapped() {
-        return new MappedJournalSegmentWriter<>(this);
+    MappedJournalSegmentWriter toMapped() {
+        return new MappedJournalSegmentWriter(this);
     }
 
     @Override
-    DiskJournalSegmentWriter<E> toFileChannel() {
+    DiskJournalSegmentWriter toFileChannel() {
         return this;
     }
 
     @Override
-    JournalSegmentReader<E> reader() {
+    JournalSegmentReader reader() {
         return reader;
     }
 
index 9239f86d256eca2d0f481bfa5d59f376fb617cb4..45405aae686f908642c9c3a7309bd113db992686 100644 (file)
@@ -34,18 +34,17 @@ import org.eclipse.jdt.annotation.Nullable;
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
-final class JournalSegment<E> implements AutoCloseable {
+final class JournalSegment implements AutoCloseable {
   private final JournalSegmentFile file;
   private final JournalSegmentDescriptor descriptor;
   private final StorageLevel storageLevel;
   private final int maxEntrySize;
   private final JournalIndex journalIndex;
-  private final JournalSerdes namespace;
-  private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
+  private final Set<JournalSegmentReader> readers = ConcurrentHashMap.newKeySet();
   private final AtomicInteger references = new AtomicInteger();
   private final FileChannel channel;
 
-  private JournalSegmentWriter<E> writer;
+  private JournalSegmentWriter writer;
   private boolean open = true;
 
   JournalSegment(
@@ -53,13 +52,11 @@ final class JournalSegment<E> implements AutoCloseable {
       JournalSegmentDescriptor descriptor,
       StorageLevel storageLevel,
       int maxEntrySize,
-      double indexDensity,
-      JournalSerdes namespace) {
+      double indexDensity) {
     this.file = file;
     this.descriptor = descriptor;
     this.storageLevel = storageLevel;
     this.maxEntrySize = maxEntrySize;
-    this.namespace = namespace;
     journalIndex = new SparseJournalIndex(indexDensity);
     try {
       channel = FileChannel.open(file.file().toPath(),
@@ -68,9 +65,8 @@ final class JournalSegment<E> implements AutoCloseable {
       throw new StorageException(e);
     }
     writer = switch (storageLevel) {
-        case DISK -> new DiskJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace);
-        case MAPPED -> new MappedJournalSegmentWriter<>(channel, this, maxEntrySize, journalIndex, namespace)
-            .toFileChannel();
+        case DISK -> new DiskJournalSegmentWriter(channel, this, maxEntrySize, journalIndex);
+        case MAPPED -> new MappedJournalSegmentWriter(channel, this, maxEntrySize, journalIndex).toFileChannel();
     };
   }
 
@@ -161,7 +157,7 @@ final class JournalSegment<E> implements AutoCloseable {
    *
    * @return The segment writer.
    */
-  JournalSegmentWriter<E> acquireWriter() {
+  JournalSegmentWriter acquireWriter() {
     checkOpen();
     acquire();
 
@@ -180,7 +176,7 @@ final class JournalSegment<E> implements AutoCloseable {
    *
    * @return A new segment reader.
    */
-  JournalSegmentReader<E> createReader() {
+  JournalSegmentReader createReader() {
     checkOpen();
     acquire();
 
@@ -188,7 +184,7 @@ final class JournalSegment<E> implements AutoCloseable {
     final var path = file.file().toPath();
     final var fileReader = buffer != null ? new MappedFileReader(path, buffer)
         : new DiskFileReader(path, channel, descriptor.maxSegmentSize(), maxEntrySize);
-    final var reader = new JournalSegmentReader<>(this, fileReader, maxEntrySize, namespace);
+    final var reader = new JournalSegmentReader(this, fileReader, maxEntrySize);
     reader.setPosition(JournalSegmentDescriptor.BYTES);
     readers.add(reader);
     return reader;
@@ -199,7 +195,7 @@ final class JournalSegment<E> implements AutoCloseable {
    *
    * @param reader the closed segment reader
    */
-  void closeReader(JournalSegmentReader<E> reader) {
+  void closeReader(JournalSegmentReader reader) {
     if (readers.remove(reader)) {
       release();
     }
index 93ccd1748eb9d35ab3789ccbfce0d6ac41f073b3..d89c720c67fef07de93cf623581893fe689dc1ae 100644 (file)
@@ -18,30 +18,28 @@ package io.atomix.storage.journal;
 import static com.google.common.base.Verify.verify;
 import static java.util.Objects.requireNonNull;
 
-import com.esotericsoftware.kryo.KryoException;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.util.zip.CRC32;
 import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class JournalSegmentReader<E> {
+final class JournalSegmentReader {
     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentReader.class);
 
-    private final JournalSegment<E> segment;
-    private final JournalSerdes namespace;
+    private final JournalSegment segment;
     private final FileReader fileReader;
     private final int maxSegmentSize;
     private final int maxEntrySize;
 
     private int position;
 
-    JournalSegmentReader(final JournalSegment<E> segment, final FileReader fileReader,
-            final int maxEntrySize, final JournalSerdes namespace) {
+    JournalSegmentReader(final JournalSegment segment, final FileReader fileReader, final int maxEntrySize) {
         this.segment = requireNonNull(segment);
         this.fileReader = requireNonNull(fileReader);
         maxSegmentSize = segment.descriptor().maxSegmentSize();
         this.maxEntrySize = maxEntrySize;
-        this.namespace = requireNonNull(namespace);
     }
 
     /**
@@ -73,12 +71,12 @@ final class JournalSegmentReader<E> {
     }
 
     /**
-     * Reads the next entry, assigning it specified index.
+     * Reads the next binary data block
      *
      * @param index entry index
-     * @return The entry, or {@code null}
+     * @return The binary data, or {@code null}
      */
-    @Nullable Indexed<E> readEntry(final long index) {
+    @Nullable ByteBuf readBytes(final long index) {
         // Check if there is enough in the buffer remaining
         final int remaining = maxSegmentSize - position - SegmentEntry.HEADER_BYTES;
         if (remaining < 0) {
@@ -102,10 +100,10 @@ final class JournalSegmentReader<E> {
         final int checksum = buffer.getInt(Integer.BYTES);
 
         // Slice off the entry's bytes
-        final var entryBytes = buffer.slice(SegmentEntry.HEADER_BYTES, length);
+        final var entryBuffer = buffer.slice(SegmentEntry.HEADER_BYTES, length);
         // Compute the checksum for the entry bytes.
         final var crc32 = new CRC32();
-        crc32.update(entryBytes);
+        crc32.update(entryBuffer);
 
         // If the stored checksum does not equal the computed checksum, do not proceed further
         final var computed = (int) crc32.getValue();
@@ -115,20 +113,12 @@ final class JournalSegmentReader<E> {
             return null;
         }
 
-        // Attempt to deserialize
-        final E entry;
-        try {
-            entry = namespace.deserialize(entryBytes.rewind());
-        } catch (KryoException e) {
-            // TODO: promote this to a hard error, as it should never happen
-            LOG.debug("Failed to deserialize entry", e);
-            invalidateCache();
-            return null;
-        }
+        // update position
+        position += SegmentEntry.HEADER_BYTES + length;
 
-        // We are all set. Update the position.
-        position = position + SegmentEntry.HEADER_BYTES + length;
-        return new Indexed<>(index, entry, length);
+        // return bytes
+        entryBuffer.rewind();
+        return Unpooled.buffer(length).writeBytes(entryBuffer);
     }
 
     /**
index c7c035be65b06b187ea8f6f508a30be45812155f..c2e0a258c9c4a650b62c73b548a5dc49f61e938b 100644 (file)
@@ -18,8 +18,8 @@ package io.atomix.storage.journal;
 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
 import static java.util.Objects.requireNonNull;
 
-import com.esotericsoftware.kryo.KryoException;
 import io.atomix.storage.journal.index.JournalIndex;
+import io.netty.buffer.ByteBuf;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
@@ -29,37 +29,36 @@ import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
+abstract sealed class JournalSegmentWriter permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
     private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
 
     final @NonNull FileChannel channel;
-    final @NonNull JournalSegment<E> segment;
+    final @NonNull JournalSegment segment;
     private final @NonNull JournalIndex index;
-    final @NonNull JournalSerdes namespace;
     final int maxSegmentSize;
     final int maxEntrySize;
 
-    private Indexed<E> lastEntry;
     private int currentPosition;
+    private Long lastIndex;
+    private ByteBuf lastWritten;
 
-    JournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
-            final JournalIndex index, final JournalSerdes namespace) {
+    JournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+            final JournalIndex index) {
         this.channel = requireNonNull(channel);
         this.segment = requireNonNull(segment);
         this.index = requireNonNull(index);
-        this.namespace = requireNonNull(namespace);
         maxSegmentSize = segment.descriptor().maxSegmentSize();
         this.maxEntrySize = maxEntrySize;
     }
 
-    JournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+    JournalSegmentWriter(final JournalSegmentWriter previous) {
         channel = previous.channel;
         segment = previous.segment;
         index = previous.index;
-        namespace = previous.namespace;
         maxSegmentSize = previous.maxSegmentSize;
         maxEntrySize = previous.maxEntrySize;
-        lastEntry = previous.lastEntry;
+        lastWritten = previous.lastWritten;
+        lastIndex = previous.lastIndex;
         currentPosition = previous.currentPosition;
     }
 
@@ -69,16 +68,16 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @return The last written index.
      */
     final long getLastIndex() {
-        return lastEntry != null ? lastEntry.index() : segment.firstIndex() - 1;
+        return lastIndex != null ? lastIndex : segment.firstIndex() - 1;
     }
 
     /**
-     * Returns the last entry written.
+     * Returns the last data written.
      *
-     * @return The last entry written.
+     * @return The last data written.
      */
-    final Indexed<E> getLastEntry() {
-        return lastEntry;
+    final ByteBuf getLastWritten() {
+        return lastWritten == null ? null : lastWritten.slice();
     }
 
     /**
@@ -87,63 +86,52 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      * @return The next index to be written.
      */
     final long getNextIndex() {
-        return lastEntry != null ? lastEntry.index() + 1 : segment.firstIndex();
+        return lastIndex != null ? lastIndex + 1 : segment.firstIndex();
     }
 
     /**
-     * Tries to append an entry to the journal.
+     * Tries to append a binary data to the journal.
      *
-     * @param entry The entry to append.
-     * @return The appended indexed entry, or {@code null} if there is not enough space available
+     * @param buf binary data to append
+     * @return The index of appended data, or {@code null} if segment has no space
      */
-    final <T extends E> @Nullable Indexed<T> append(final T entry) {
+    final Long append(final ByteBuf buf) {
+        final var length = buf.readableBytes();
+        if (length > maxEntrySize) {
+            throw new StorageException.TooLarge("Serialized entry size exceeds maximum allowed bytes ("
+                + maxEntrySize + ")");
+        }
+
         // Store the entry index.
         final long index = getNextIndex();
         final int position = currentPosition;
 
-        // Serialize the entry.
-        final int bodyPosition = position + HEADER_BYTES;
-        final int avail = maxSegmentSize - bodyPosition;
-        if (avail < 0) {
+        // check space available
+        final int nextPosition = position + HEADER_BYTES + length;
+        if (nextPosition >= maxSegmentSize) {
             LOG.trace("Not enough space for {} at {}", index, position);
             return null;
         }
 
-        final var writeLimit = Math.min(avail, maxEntrySize);
-        final var diskEntry = startWrite(position, writeLimit + HEADER_BYTES).position(HEADER_BYTES);
-        try {
-            namespace.serialize(entry, diskEntry);
-        } catch (KryoException e) {
-            if (writeLimit != maxEntrySize) {
-                // We have not provided enough capacity, signal to roll to next segment
-                LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
-                return null;
-            }
-
-            // Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
-            throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
-        }
-
-        final int length = diskEntry.position() - HEADER_BYTES;
+        // allocate buffer and write data
+        final var writeBuffer = startWrite(position, length + HEADER_BYTES).position(HEADER_BYTES);
+        writeBuffer.put(buf.nioBuffer());
 
         // Compute the checksum for the entry.
         final var crc32 = new CRC32();
-        crc32.update(diskEntry.flip().position(HEADER_BYTES));
+        crc32.update(writeBuffer.flip().position(HEADER_BYTES));
 
         // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
-        diskEntry.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
-        commitWrite(position, diskEntry.rewind());
+        writeBuffer.putInt(0, length).putInt(Integer.BYTES, (int) crc32.getValue());
+        commitWrite(position, writeBuffer.rewind());
 
         // Update the last entry with the correct index/term/length.
-        final var indexedEntry = new Indexed<E>(index, entry, length);
-        lastEntry = indexedEntry;
+        currentPosition = nextPosition;
+        lastWritten = buf;
+        lastIndex = index;
         this.index.index(index, position);
 
-        currentPosition = bodyPosition + length;
-
-        @SuppressWarnings("unchecked")
-        final var ugly = (Indexed<T>) indexedEntry;
-        return ugly;
+        return index;
     }
 
     abstract ByteBuffer startWrite(int position, int size);
@@ -167,9 +155,9 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         }
     }
 
-    abstract JournalSegmentReader<E> reader();
+    abstract JournalSegmentReader reader();
 
-    private void resetWithBuffer(final JournalSegmentReader<E> reader, final long index) {
+    private void resetWithBuffer(final JournalSegmentReader reader, final long index) {
         long nextIndex = segment.firstIndex();
 
         // Clear the buffer indexes and acquire ownership of the buffer
@@ -177,17 +165,18 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
         reader.setPosition(JournalSegmentDescriptor.BYTES);
 
         while (index == 0 || nextIndex <= index) {
-            final var entry = reader.readEntry(nextIndex);
-            if (entry == null) {
+            final var buf = reader.readBytes(nextIndex);
+            if (buf == null) {
                 break;
             }
 
-            lastEntry = entry;
+            lastWritten = buf;
+            lastIndex = nextIndex;
             this.index.index(nextIndex, currentPosition);
             nextIndex++;
 
             // Update the current position for indexing.
-            currentPosition = currentPosition + HEADER_BYTES + entry.size();
+            currentPosition += HEADER_BYTES + buf.readableBytes();
         }
     }
 
@@ -202,8 +191,9 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
             return;
         }
 
-        // Reset the last entry.
-        lastEntry = null;
+        // Reset the last written
+        lastIndex = null;
+        lastWritten = null;
 
         // Truncate the index.
         this.index.truncate(index);
@@ -245,7 +235,7 @@ abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter,
      */
     abstract @Nullable MappedByteBuffer buffer();
 
-    abstract @NonNull MappedJournalSegmentWriter<E> toMapped();
+    abstract @NonNull MappedJournalSegmentWriter toMapped();
 
-    abstract @NonNull DiskJournalSegmentWriter<E> toFileChannel();
+    abstract @NonNull DiskJournalSegmentWriter toFileChannel();
 }
index 32fc8d30c619780184863a15051063cc94d4c754..a970882edfa1203830cb36a7b75df867af012433 100644 (file)
@@ -26,7 +26,10 @@ import java.nio.ByteBuffer;
 
 /**
  * Support for serialization of {@link Journal} entries.
+ *
+ * @deprecated due to dependency on outdated Kryo library, {@link JournalSerializer} to be used instead.
  */
+@Deprecated(forRemoval = true, since="9.0.3")
 public interface JournalSerdes {
     /**
      * Serializes given object to byte array.
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java b/atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerializer.java
new file mode 100644 (file)
index 0000000..eff9af8
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech s.r.o. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+
+/**
+ * Support for serialization of {@link Journal} entries.
+ */
+public interface JournalSerializer<T> {
+
+    /**
+     * Serializes given object to byte array.
+     *
+     * @param obj Object to serialize
+     * @return serialized bytes as {@link ByteBuf}
+     */
+    ByteBuf serialize(T obj) ;
+
+    /**
+     * Deserializes given byte array to Object.
+     *
+     * @param buf serialized bytes as {@link ByteBuf}
+     * @return deserialized Object
+     */
+    T deserialize(final ByteBuf buf);
+
+    static <E> JournalSerializer<E> wrap(final JournalSerdes serdes) {
+        return new JournalSerializer<>() {
+            @Override
+            public ByteBuf serialize(final E obj) {
+                return Unpooled.wrappedBuffer(serdes.serialize(obj));
+            }
+
+            @Override
+            public E deserialize(final ByteBuf buf) {
+                return serdes.deserialize(ByteBufUtil.getBytes(buf));
+            }
+        };
+    }
+}
index 00dd4c6cecab4bc20616d1823ecd5dba1571874b..48d9e764f6b5acd688e19019bcce79c9376137dd 100644 (file)
@@ -38,29 +38,29 @@ import org.eclipse.jdt.annotation.NonNull;
  *
  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
  */
-final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
+final class MappedJournalSegmentWriter extends JournalSegmentWriter {
     private final @NonNull MappedByteBuffer mappedBuffer;
-    private final JournalSegmentReader<E> reader;
+    private final JournalSegmentReader reader;
     private final ByteBuffer buffer;
 
-    MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
-        final JournalIndex index, final JournalSerdes namespace) {
-        super(channel, segment, maxEntrySize, index, namespace);
+    MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
+            final JournalIndex index) {
+        super(channel, segment, maxEntrySize, index);
 
         mappedBuffer = mapBuffer(channel, maxSegmentSize);
         buffer = mappedBuffer.slice();
-        reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
-            maxEntrySize, namespace);
+        reader = new JournalSegmentReader(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+            maxEntrySize);
         reset(0);
     }
 
-    MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
+    MappedJournalSegmentWriter(final JournalSegmentWriter previous) {
         super(previous);
 
         mappedBuffer = mapBuffer(channel, maxSegmentSize);
         buffer = mappedBuffer.slice();
-        reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
-            maxEntrySize, namespace);
+        reader = new JournalSegmentReader(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
+            maxEntrySize);
     }
 
     private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
@@ -77,18 +77,18 @@ final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
     }
 
     @Override
-    MappedJournalSegmentWriter<E> toMapped() {
+    MappedJournalSegmentWriter toMapped() {
         return this;
     }
 
     @Override
-    DiskJournalSegmentWriter<E> toFileChannel() {
+    DiskJournalSegmentWriter toFileChannel() {
         close();
-        return new DiskJournalSegmentWriter<>(this);
+        return new DiskJournalSegmentWriter(this);
     }
 
     @Override
-    JournalSegmentReader<E> reader() {
+    JournalSegmentReader reader() {
         return reader;
     }
 
index ef1a4cf028ebba5a8293e000cccc3a2a1e8eb95f..7289d3deefeab09c2855905233319f8760256a49 100644 (file)
  */
 package io.atomix.storage.journal;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -31,10 +35,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
 /**
  * Segmented journal.
  */
@@ -54,7 +54,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
   private final String name;
   private final StorageLevel storageLevel;
   private final File directory;
-  private final JournalSerdes namespace;
+  private final JournalSerializer<E> serializer;
   private final int maxSegmentSize;
   private final int maxEntrySize;
   private final int maxEntriesPerSegment;
@@ -63,9 +63,9 @@ public final class SegmentedJournal<E> implements Journal<E> {
   private final SegmentedJournalWriter<E> writer;
   private volatile long commitIndex;
 
-  private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
-  private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
-  private JournalSegment<E> currentSegment;
+  private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
+  private final Collection<SegmentedJournalReader> readers = ConcurrentHashMap.newKeySet();
+  private JournalSegment currentSegment;
 
   private volatile boolean open = true;
 
@@ -82,7 +82,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
     this.name = requireNonNull(name, "name cannot be null");
     this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
     this.directory = requireNonNull(directory, "directory cannot be null");
-    this.namespace = requireNonNull(namespace, "namespace cannot be null");
+    this.serializer = JournalSerializer.wrap(requireNonNull(namespace, "namespace cannot be null"));
     this.maxSegmentSize = maxSegmentSize;
     this.maxEntrySize = maxEntrySize;
     this.maxEntriesPerSegment = maxEntriesPerSegment;
@@ -166,7 +166,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
    *
    * @return the collection of journal segments
    */
-  public Collection<JournalSegment<E>> segments() {
+  public Collection<JournalSegment> segments() {
     return segments.values();
   }
 
@@ -176,10 +176,19 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * @param index the starting index
    * @return the journal segments starting with indexes greater than or equal to the given index
    */
-  public Collection<JournalSegment<E>> segments(long index) {
+  public Collection<JournalSegment> segments(long index) {
     return segments.tailMap(index).values();
   }
 
+  /**
+   * Returns serializer instance.
+   *
+   * @return serializer instance
+   */
+  JournalSerializer<E> serializer() {
+    return serializer;
+  }
+
   /**
    * Returns the total size of the journal.
    *
@@ -230,7 +239,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
    */
   private synchronized void open() {
     // Load existing log segments from disk.
-    for (JournalSegment<E> segment : loadSegments()) {
+    for (JournalSegment segment : loadSegments()) {
       segments.put(segment.descriptor().index(), segment);
     }
 
@@ -274,7 +283,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * Resets the current segment, creating a new segment if necessary.
    */
   private synchronized void resetCurrentSegment() {
-    JournalSegment<E> lastSegment = getLastSegment();
+    JournalSegment lastSegment = getLastSegment();
     if (lastSegment != null) {
       currentSegment = lastSegment;
     } else {
@@ -297,16 +306,16 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * @param index the starting index of the journal
    * @return the first segment
    */
-  JournalSegment<E> resetSegments(long index) {
+  JournalSegment resetSegments(long index) {
     assertOpen();
 
     // If the index already equals the first segment index, skip the reset.
-    JournalSegment<E> firstSegment = getFirstSegment();
+    JournalSegment firstSegment = getFirstSegment();
     if (index == firstSegment.firstIndex()) {
       return firstSegment;
     }
 
-    for (JournalSegment<E> segment : segments.values()) {
+    for (JournalSegment segment : segments.values()) {
       segment.close();
       segment.delete();
     }
@@ -328,9 +337,9 @@ public final class SegmentedJournal<E> implements Journal<E> {
    *
    * @throws IllegalStateException if the segment manager is not open
    */
-  JournalSegment<E> getFirstSegment() {
+  JournalSegment getFirstSegment() {
     assertOpen();
-    Map.Entry<Long, JournalSegment<E>> segment = segments.firstEntry();
+    Map.Entry<Long, JournalSegment> segment = segments.firstEntry();
     return segment != null ? segment.getValue() : null;
   }
 
@@ -339,9 +348,9 @@ public final class SegmentedJournal<E> implements Journal<E> {
    *
    * @throws IllegalStateException if the segment manager is not open
    */
-  JournalSegment<E> getLastSegment() {
+  JournalSegment getLastSegment() {
     assertOpen();
-    Map.Entry<Long, JournalSegment<E>> segment = segments.lastEntry();
+    Map.Entry<Long, JournalSegment> segment = segments.lastEntry();
     return segment != null ? segment.getValue() : null;
   }
 
@@ -351,11 +360,11 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * @return The next segment.
    * @throws IllegalStateException if the segment manager is not open
    */
-  synchronized JournalSegment<E> getNextSegment() {
+  synchronized JournalSegment getNextSegment() {
     assertOpen();
     assertDiskSpace();
 
-    JournalSegment<E> lastSegment = getLastSegment();
+    JournalSegment lastSegment = getLastSegment();
     JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
         .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
         .withIndex(currentSegment.lastIndex() + 1)
@@ -375,8 +384,8 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * @param index The segment index with which to look up the next segment.
    * @return The next segment for the given index.
    */
-  JournalSegment<E> getNextSegment(long index) {
-    Map.Entry<Long, JournalSegment<E>> nextSegment = segments.higherEntry(index);
+  JournalSegment getNextSegment(long index) {
+    Map.Entry<Long, JournalSegment> nextSegment = segments.higherEntry(index);
     return nextSegment != null ? nextSegment.getValue() : null;
   }
 
@@ -386,7 +395,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * @param index The index for which to return the segment.
    * @throws IllegalStateException if the segment manager is not open
    */
-  synchronized JournalSegment<E> getSegment(long index) {
+  synchronized JournalSegment getSegment(long index) {
     assertOpen();
     // Check if the current segment contains the given index first in order to prevent an unnecessary map lookup.
     if (currentSegment != null && index > currentSegment.firstIndex()) {
@@ -394,7 +403,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
     }
 
     // If the index is in another segment, get the entry with the next lowest first index.
-    Map.Entry<Long, JournalSegment<E>> segment = segments.floorEntry(index);
+    Map.Entry<Long, JournalSegment> segment = segments.floorEntry(index);
     if (segment != null) {
       return segment.getValue();
     }
@@ -406,7 +415,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
    *
    * @param segment The segment to remove.
    */
-  synchronized void removeSegment(JournalSegment<E> segment) {
+  synchronized void removeSegment(JournalSegment segment) {
     segments.remove(segment.firstIndex());
     segment.close();
     segment.delete();
@@ -416,7 +425,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
   /**
    * Creates a new segment.
    */
-  JournalSegment<E> createSegment(JournalSegmentDescriptor descriptor) {
+  JournalSegment createSegment(JournalSegmentDescriptor descriptor) {
     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, descriptor.id());
 
     RandomAccessFile raf;
@@ -443,7 +452,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
       } catch (IOException e) {
       }
     }
-    JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
+    JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
     LOG.debug("Created segment: {}", segment);
     return segment;
   }
@@ -455,21 +464,21 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * @param descriptor The segment descriptor.
    * @return The segment instance.
    */
-  protected JournalSegment<E> newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
-    return new JournalSegment<>(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity, namespace);
+  protected JournalSegment newSegment(JournalSegmentFile segmentFile, JournalSegmentDescriptor descriptor) {
+    return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
   }
 
   /**
    * Loads a segment.
    */
-  private JournalSegment<E> loadSegment(long segmentId) {
+  private JournalSegment loadSegment(long segmentId) {
     File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
     ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
     try (FileChannel channel = openChannel(segmentFile)) {
       channel.read(buffer);
       buffer.flip();
       JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
-      JournalSegment<E> segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
+      JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
       LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
       return segment;
     } catch (IOException e) {
@@ -490,11 +499,11 @@ public final class SegmentedJournal<E> implements Journal<E> {
    *
    * @return A collection of segments for the log.
    */
-  protected Collection<JournalSegment<E>> loadSegments() {
+  protected Collection<JournalSegment> loadSegments() {
     // Ensure log directories are created.
     directory.mkdirs();
 
-    TreeMap<Long, JournalSegment<E>> segments = new TreeMap<>();
+    TreeMap<Long, JournalSegment> segments = new TreeMap<>();
 
     // Iterate through all files in the log directory.
     for (File file : directory.listFiles(File::isFile)) {
@@ -513,7 +522,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
         JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
 
         // Load the segment.
-        JournalSegment<E> segment = loadSegment(descriptor.id());
+        JournalSegment segment = loadSegment(descriptor.id());
 
         // Add the segment to the segments list.
         LOG.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
@@ -522,11 +531,11 @@ public final class SegmentedJournal<E> implements Journal<E> {
     }
 
     // Verify that all the segments in the log align with one another.
-    JournalSegment<E> previousSegment = null;
+    JournalSegment previousSegment = null;
     boolean corrupted = false;
-    Iterator<Map.Entry<Long, JournalSegment<E>>> iterator = segments.entrySet().iterator();
+    Iterator<Map.Entry<Long, JournalSegment>> iterator = segments.entrySet().iterator();
     while (iterator.hasNext()) {
-      JournalSegment<E> segment = iterator.next().getValue();
+      JournalSegment segment = iterator.next().getValue();
       if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
         LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
         corrupted = true;
@@ -584,7 +593,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * @return indicates whether a segment can be removed from the journal
    */
   public boolean isCompactable(long index) {
-    Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
+    Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
     return segmentEntry != null && segments.headMap(segmentEntry.getValue().firstIndex()).size() > 0;
   }
 
@@ -595,7 +604,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
    * @return the starting index of the last segment in the log
    */
   public long getCompactableIndex(long index) {
-    Map.Entry<Long, JournalSegment<E>> segmentEntry = segments.floorEntry(index);
+    Map.Entry<Long, JournalSegment> segmentEntry = segments.floorEntry(index);
     return segmentEntry != null ? segmentEntry.getValue().firstIndex() : 0;
   }
 
@@ -612,7 +621,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
       final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
       if (!compactSegments.isEmpty()) {
         LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
-        for (JournalSegment<E> segment : compactSegments.values()) {
+        for (JournalSegment segment : compactSegments.values()) {
           LOG.trace("Deleting segment: {}", segment);
           segment.close();
           segment.delete();
index cc0fe0d31c0c71dcaf7bed6bf7cedcfbf001b91a..42f40e0c72f92bee28de27905b27feac2074a8cb 100644 (file)
@@ -24,12 +24,12 @@ import static java.util.Objects.requireNonNull;
 sealed class SegmentedJournalReader<E> implements JournalReader<E> permits CommitsSegmentJournalReader {
     final SegmentedJournal<E> journal;
 
-    private JournalSegment<E> currentSegment;
-    private JournalSegmentReader<E> currentReader;
+    private JournalSegment currentSegment;
+    private JournalSegmentReader currentReader;
     private Indexed<E> currentEntry;
     private long nextIndex;
 
-    SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment<E> segment) {
+    SegmentedJournalReader(final SegmentedJournal<E> journal, final JournalSegment segment) {
         this.journal = requireNonNull(journal);
         currentSegment = requireNonNull(segment);
         currentReader = segment.createReader();
@@ -99,7 +99,7 @@ sealed class SegmentedJournalReader<E> implements JournalReader<E> permits Commi
      */
     private void rewind(final long index) {
         if (currentSegment.firstIndex() >= index) {
-            JournalSegment<E> segment = journal.getSegment(index - 1);
+            JournalSegment segment = journal.getSegment(index - 1);
             if (segment != null) {
                 currentReader.close();
 
@@ -113,8 +113,8 @@ sealed class SegmentedJournalReader<E> implements JournalReader<E> permits Commi
 
     @Override
     public Indexed<E> tryNext() {
-        var next = currentReader.readEntry(nextIndex);
-        if (next == null) {
+        var buf = currentReader.readBytes(nextIndex);
+        if (buf == null) {
             final var nextSegment = journal.getNextSegment(currentSegment.firstIndex());
             if (nextSegment == null || nextSegment.firstIndex() != nextIndex) {
                 return null;
@@ -124,15 +124,15 @@ sealed class SegmentedJournalReader<E> implements JournalReader<E> permits Commi
 
             currentSegment = nextSegment;
             currentReader = currentSegment.createReader();
-            next = currentReader.readEntry(nextIndex);
-            if (next == null) {
+            buf = currentReader.readBytes(nextIndex);
+            if (buf == null) {
                 return null;
             }
         }
 
-        nextIndex = nextIndex + 1;
-        currentEntry = next;
-        return next;
+        final var entry = journal.serializer().deserialize(buf);
+        currentEntry = new Indexed<>(nextIndex++, entry, buf.readableBytes());
+        return currentEntry;
     }
 
     @Override
index a95622e5535ef69af5c405729fd84d6bd96ab515..9ff535284dc23b574ef54ff00f864fdcbeafa6a6 100644 (file)
@@ -22,8 +22,8 @@ import static com.google.common.base.Verify.verifyNotNull;
  */
 final class SegmentedJournalWriter<E> implements JournalWriter<E> {
   private final SegmentedJournal<E> journal;
-  private JournalSegment<E> currentSegment;
-  private JournalSegmentWriter<E> currentWriter;
+  private JournalSegment currentSegment;
+  private JournalSegmentWriter currentWriter;
 
   SegmentedJournalWriter(SegmentedJournal<E> journal) {
     this.journal = journal;
@@ -38,7 +38,12 @@ final class SegmentedJournalWriter<E> implements JournalWriter<E> {
 
   @Override
   public Indexed<E> getLastEntry() {
-    return currentWriter.getLastEntry();
+    final var lastWritten = currentWriter.getLastWritten();
+    if (lastWritten == null) {
+      return null;
+    }
+    final E deserialized = journal.serializer().deserialize(lastWritten);
+    return new Indexed<>(currentWriter.getLastIndex(), deserialized, lastWritten.readableBytes()) ;
   }
 
   @Override
@@ -70,9 +75,10 @@ final class SegmentedJournalWriter<E> implements JournalWriter<E> {
 
   @Override
   public <T extends E> Indexed<T> append(T entry) {
-    var indexed = currentWriter.append(entry);
-    if (indexed != null) {
-      return indexed;
+    final var bytes = journal.serializer().serialize(entry);
+    var index = currentWriter.append(bytes);
+    if (index != null) {
+      return new Indexed<>(index, entry, bytes.readableBytes());
     }
 
     //  Slow path: we do not have enough capacity
@@ -80,7 +86,8 @@ final class SegmentedJournalWriter<E> implements JournalWriter<E> {
     currentSegment.releaseWriter();
     currentSegment = journal.getNextSegment();
     currentWriter = currentSegment.acquireWriter();
-    return verifyNotNull(currentWriter.append(entry));
+    final var newIndex = verifyNotNull(currentWriter.append(bytes));
+    return new Indexed<>(newIndex, entry, bytes.readableBytes());
   }
 
   @Override
index 5d1bbbeb51ebc545edb2c2892b70d621bd16603e..0d98ae70987780a11766827b2b485be08e4887f5 100644 (file)
             <type>xml</type>
             <classifier>features</classifier>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.odlparent</groupId>
+            <artifactId>odl-netty-4</artifactId>
+            <type>xml</type>
+            <classifier>features</classifier>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.odlparent</groupId>
             <artifactId>odl-servlet-api</artifactId>
index f4034164f185861c2858c6ab5db297bc8a5a2331..7a41fc13bbfa9a4a0bc6c29e1835f3b590e75355 100644 (file)
@@ -10,6 +10,7 @@
     <feature name="odl-mdsal-clustering-commons" version="${project.version}">
         <feature version="[13,14)">odl-apache-commons-lang3</feature>
         <feature version="[13,14)">odl-dropwizard-metrics</feature>
+        <feature version="[13,14)">odl-netty-4</feature>
         <feature version="[13,14)">odl-servlet-api</feature>
         <feature version="[13,14)">odl-yangtools-data</feature>
         <feature version="[13,14)">odl-yangtools-codec</feature>