Separate out {From,To}ByteBufMapper 88/111688/1
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 14 May 2024 12:13:47 +0000 (14:13 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 14 May 2024 12:17:35 +0000 (14:17 +0200)
We have overlapping method definitions on the read side. Let's split up
ByteBufMapper into two interfaces so they can be reusable.

JIRA: CONTROLLER-2115
Change-Id: I9c897a83e15818a9ca5269cf2b22d725d5d706b9
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
14 files changed:
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/FromByteBufMapper.java [new file with mode: 0644]
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSegmentWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedByteBufWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalReader.java
atomix-storage/src/main/java/io/atomix/storage/journal/SegmentedJournalWriter.java
atomix-storage/src/main/java/io/atomix/storage/journal/ToByteBufMapper.java [moved from atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java with 77% similarity]
atomix-storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java
opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java

index 80135118b926810b4a6ab8680c611a9aed81b4ff..c43e0ea8954553d7ee5e3875b873e5777b90c4b0 100644 (file)
@@ -15,7 +15,6 @@
  */
 package io.atomix.storage.journal;
 
-import io.netty.buffer.ByteBuf;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 
@@ -24,23 +23,6 @@ import org.eclipse.jdt.annotation.Nullable;
  */
 @NonNullByDefault
 public interface ByteBufReader extends AutoCloseable {
-    /**
-     * A journal entry processor. Responsible for transforming bytes into their internal representation.
-     *
-     * @param <T> Internal representation type
-     */
-    @FunctionalInterface
-    interface EntryMapper<T> {
-        /**
-         * Process an entry.
-         *
-         * @param index entry index
-         * @param bytes entry bytes
-         * @return resulting internal representation
-         */
-        T mapEntry(long index, ByteBuf bytes);
-    }
-
     /**
      * Returns the next reader index.
      *
@@ -51,10 +33,10 @@ public interface ByteBufReader extends AutoCloseable {
     /**
      * Try to move to the next binary data block.
      *
-     * @param entryMapper callback to be invoked on binary data
+     * @param mapper callback to be invoked on binary data
      * @return processed binary data, or {@code null}
      */
-    <T> @Nullable T tryNext(EntryMapper<T> entryMapper);
+    <T> @Nullable T tryNext(FromByteBufMapper<T> mapper);
 
     /**
      * Resets the reader to the start.
index c092f20d50ea4adddce40871775deb2ae0720934..eee75cb8f6b37ef0517acd1fc2d6d5532ce7fd09 100644 (file)
@@ -32,12 +32,12 @@ public interface ByteBufWriter {
     /**
      * Appends an entry to the journal.
      *
-     * @param mapper a {@link ByteBufMapper} to use with entry
+     * @param mapper a {@link ToByteBufMapper} to use with entry
      * @param entry entry to append
      * @return the on-disk size of the entry
      */
     // FIXME: throws IOException
-    <T> int append(ByteBufMapper<T> mapper, T entry);
+    <T> int append(ToByteBufMapper<T> mapper, T entry);
 
     /**
      * Commits entries up to the given index.
diff --git a/atomix-storage/src/main/java/io/atomix/storage/journal/FromByteBufMapper.java b/atomix-storage/src/main/java/io/atomix/storage/journal/FromByteBufMapper.java
new file mode 100644 (file)
index 0000000..5a0c736
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * Interface for transforming bytes into their internal representation.
+ *
+ * @param <T> Internal representation type
+ */
+@NonNullByDefault
+@FunctionalInterface
+public interface FromByteBufMapper<T> {
+    /**
+     * Converts the contents of a {@link ByteBuf} to an object.
+     *
+     * @param index entry index
+     * @param bytes entry bytes
+     * @return resulting object
+     */
+    T bytesToObject(long index, ByteBuf bytes);
+}
index 9b083bba0636ec756fbd01f8a0497b31b8c05334..75c8e25b459c224fd362c86e0fbb057a362bb1c6 100644 (file)
@@ -74,7 +74,7 @@ final class JournalSegmentWriter {
      * @param entry the entry
      * @return the entry size, or {@code null} if segment has no space
      */
-    <T> @Nullable Integer append(final ByteBufMapper<T> mapper, final T entry) {
+    <T> @Nullable Integer append(final ToByteBufMapper<T> mapper, final T entry) {
         // we are appending at this index and position
         final long index = nextIndex();
         final int position = currentPosition;
index 0265b8c6799e4bb2df144214d06fb3620b7616d2..8ae7dbed707a1e89385b15f0080ac005a6554364 100644 (file)
@@ -20,7 +20,6 @@ import com.esotericsoftware.kryo.KryoException;
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import io.atomix.utils.serializer.KryoJournalSerdesBuilder;
-import io.netty.buffer.ByteBuf;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,7 +29,7 @@ import java.nio.ByteBuffer;
 /**
  * Support for serialization of {@link Journal} entries.
  *
- * @deprecated due to dependency on outdated Kryo library, {@link ByteBufMapper} to be used instead.
+ * @deprecated due to dependency on outdated Kryo library, {@link FromByteBufMapper} to be used instead.
  */
 @Deprecated(forRemoval = true, since = "9.0.3")
 public interface JournalSerdes {
@@ -113,49 +112,51 @@ public interface JournalSerdes {
      */
     <T> T deserialize(InputStream stream, int bufferSize);
 
+
     /**
-     * Returns a {@link ByteBufMapper} backed by this object.
+     * Returns a {@link FromByteBufMapper} backed by this object.
      *
-     * @return a {@link ByteBufMapper} backed by this object
+     * @return a {@link FromByteBufMapper} backed by this object
      */
-    default <T> ByteBufMapper<T> toMapper() {
-        return new ByteBufMapper<>() {
-            @Override
-            public void objectToBytes(final T obj, final ByteBuf bytes) throws IOException {
-                final var buffer = bytes.nioBuffer();
-                try {
-                    serialize(obj, buffer);
-                } catch (KryoException e) {
-                    throw newIOException(e);
-                } finally {
-                    // adjust writerIndex so that readableBytes() the bytes written
-                    bytes.writerIndex(bytes.readerIndex() + buffer.position());
-                }
-            }
+    default <T> FromByteBufMapper<T> toReadMapper() {
+        return (index, bytes) -> deserialize(bytes.nioBuffer());
+    }
 
-            @Override
-            public T bytesToObject(final long index, final ByteBuf bytes) {
-                return deserialize(bytes.nioBuffer());
+    /**
+     * Returns a {@link ToByteBufMapper} backed by this object.
+     *
+     * @return a {@link ToByteBufMapper} backed by this object
+     */
+    default <T> ToByteBufMapper<T> toWriteMapper() {
+        return (obj, buf) -> {
+            final var nio = buf.nioBuffer();
+            try {
+                serialize(obj, nio);
+            } catch (KryoException e) {
+                throw newIOException(e);
+            } finally {
+                // adjust writerIndex so that readableBytes() the bytes written
+                buf.writerIndex(buf.readerIndex() + nio.position());
             }
+        };
+    }
 
-            private static IOException newIOException(final KryoException cause) {
-                // We may have multiple nested KryoExceptions, intertwined with others, like IOExceptions. Let's find
-                // the deepest one.
-                var rootKryo = cause;
-                for (var nextCause = rootKryo.getCause(); nextCause != null; nextCause = nextCause.getCause()) {
-                    if (nextCause instanceof KryoException kryo) {
-                        rootKryo = kryo;
-                    }
-                }
-                // It would be nice to have a better way of discerning these, but alas it is what it is.
-                if (rootKryo.getMessage().startsWith("Buffer overflow.")) {
-                    final var ex = new EOFException();
-                    ex.initCause(cause);
-                    return ex;
-                }
-                return new IOException(rootKryo);
+    private static IOException newIOException(final KryoException cause) {
+        // We may have multiple nested KryoExceptions, intertwined with others, like IOExceptions. Let's find
+        // the deepest one.
+        var rootKryo = cause;
+        for (var nextCause = rootKryo.getCause(); nextCause != null; nextCause = nextCause.getCause()) {
+            if (nextCause instanceof KryoException kryo) {
+                rootKryo = kryo;
             }
-        };
+        }
+        // It would be nice to have a better way of discerning these, but alas it is what it is.
+        if (rootKryo.getMessage().startsWith("Buffer overflow.")) {
+            final var ex = new EOFException();
+            ex.initCause(cause);
+            return ex;
+        }
+        return new IOException(rootKryo);
     }
 
     /**
index 215255a4dc788ac6f9b7361fae9a4e70bebbdc51..72a954325c3350a1b05c32296210a1cab16c1757 100644 (file)
@@ -100,10 +100,10 @@ sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCo
     }
 
     @Override
-    public final <T> T tryNext(final EntryMapper<T> entryMapper) {
+    public final <T> T tryNext(final FromByteBufMapper<T> mapper) {
         final var index = nextIndex;
         final var bytes = tryAdvance(index);
-        return bytes == null ? null : entryMapper.mapEntry(index, bytes);
+        return bytes == null ? null : mapper.bytesToObject(index, bytes);
     }
 
     /**
index 103d5cf7632365f1a4d5e032471394a016d12339..1875a71b32cc0f1b5b45539f22dc61af886c8081 100644 (file)
@@ -50,13 +50,13 @@ final class SegmentedByteBufWriter implements ByteBufWriter {
     }
 
     @Override
-    public <T> int append(final ByteBufMapper<T> mapper, final T entry) {
+    public <T> int append(final ToByteBufMapper<T> mapper, final T entry) {
         final var size = currentWriter.append(mapper, entry);
         return size != null ? size : appendToNextSegment(mapper, entry);
     }
 
     //  Slow path: we do not have enough capacity
-    private <T> int appendToNextSegment(final ByteBufMapper<T> mapper, final T entry) {
+    private <T> int appendToNextSegment(final ToByteBufMapper<T> mapper, final T entry) {
         currentWriter.flush();
         currentSegment.releaseWriter();
         currentSegment = journal.createNextSegment();
index d143ee42fd843cbb44b5cf5bc0bc221435a8089a..501e650dc59baa28a5db15f2964eebc6a1e903b3 100644 (file)
@@ -19,19 +19,22 @@ package io.atomix.storage.journal;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
 
 /**
  * A {@link Journal} implementation based on a {@link ByteBufJournal}.
  */
 public final class SegmentedJournal<E> implements Journal<E> {
-    private final SegmentedJournalWriter<E> writer;
-    private final ByteBufMapper<E> mapper;
-    private final ByteBufJournal journal;
+    private final @NonNull SegmentedJournalWriter<E> writer;
+    private final @NonNull FromByteBufMapper<E> readMapper;
+    private final @NonNull ByteBufJournal journal;
 
-    public SegmentedJournal(final ByteBufJournal journal, final ByteBufMapper<E> mapper) {
+    public SegmentedJournal(final ByteBufJournal journal, final FromByteBufMapper<E> readMapper,
+            final ToByteBufMapper<E> writeMapper) {
         this.journal = requireNonNull(journal, "journal is required");
-        this.mapper = requireNonNull(mapper, "mapper cannot be null");
-        writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
+        this.readMapper = requireNonNull(readMapper, "readMapper cannot be null");
+        writer = new SegmentedJournalWriter<>(journal.writer(),
+            requireNonNull(writeMapper, "writeMapper cannot be null"));
     }
 
     @Override
@@ -67,7 +70,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
             case ALL -> journal.openReader(index);
             case COMMITS -> journal.openCommitsReader(index);
         };
-        return new SegmentedJournalReader<>(byteReader, mapper);
+        return new SegmentedJournalReader<>(byteReader, readMapper);
     }
 
     @Override
index 99fdbb64b1577e14bfe05e3ccf7e67e11cfe691c..12ad0a437c3a7905273f495b470b98926a176bb0 100644 (file)
@@ -26,10 +26,10 @@ import org.eclipse.jdt.annotation.Nullable;
  */
 @NonNullByDefault
 final class SegmentedJournalReader<E> implements JournalReader<E> {
-    private final ByteBufMapper<E> mapper;
+    private final FromByteBufMapper<E> mapper;
     private final ByteBufReader reader;
 
-    SegmentedJournalReader(final ByteBufReader reader, final ByteBufMapper<E> mapper) {
+    SegmentedJournalReader(final ByteBufReader reader, final FromByteBufMapper<E> mapper) {
         this.reader = requireNonNull(reader);
         this.mapper = requireNonNull(mapper);
     }
index 144babce761def9982ef9fe75d63305cf0b8cb98..f945a40c2f55fba3301215686b26ad3d6e855e85 100644 (file)
@@ -25,10 +25,10 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
  */
 @NonNullByDefault
 final class SegmentedJournalWriter<E> implements JournalWriter<E> {
-    private final ByteBufMapper<E> mapper;
+    private final ToByteBufMapper<E> mapper;
     private final ByteBufWriter writer;
 
-    SegmentedJournalWriter(final ByteBufWriter writer, final ByteBufMapper<E> mapper) {
+    SegmentedJournalWriter(final ByteBufWriter writer, final ToByteBufMapper<E> mapper) {
         this.writer = requireNonNull(writer);
         this.mapper = requireNonNull(mapper);
     }
similarity index 77%
rename from atomix-storage/src/main/java/io/atomix/storage/journal/ByteBufMapper.java
rename to atomix-storage/src/main/java/io/atomix/storage/journal/ToByteBufMapper.java
index fa477aa554de990e17c9796322352313ce44868d..268eacdb21d75b2c783b4c796f12328da99c9c85 100644 (file)
@@ -21,19 +21,13 @@ import java.io.IOException;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 
 /**
- * Support for mapping of {@link ByteBufJournal} entries to and from {@link ByteBuf}s.
+ * Interface for transforming internal represetation to bytes.
+ *
+ * @param <T> Internal representation type
  */
 @NonNullByDefault
-public interface ByteBufMapper<T> {
-    /**
-     * Converts the contents of a {@link ByteBuf} to an object.
-     *
-     * @param index entry index
-     * @param bytes entry bytes
-     * @return resulting object
-     */
-    T bytesToObject(long index, ByteBuf bytes);
-
+@FunctionalInterface
+public interface ToByteBufMapper<T> {
     /**
      * Converts an object into a series of bytes in the specified {@link ByteBuf}.
      *
index 03b5ab4c36464f1f5d893c4f103f7bdac9302b25..d9602c357a887e88cee712025c0a632b822ef0a2 100644 (file)
@@ -83,7 +83,7 @@ public abstract class AbstractJournalTest {
             .withStorageLevel(storageLevel)
             .withMaxSegmentSize(maxSegmentSize)
             .withIndexDensity(.2)
-            .build(), NAMESPACE.toMapper());
+            .build(), NAMESPACE.toReadMapper(), NAMESPACE.toWriteMapper());
     }
 
     @Test
index c5f9d7205eb3d9b7886a818c8333e268079372ee..935ded32e27931c423325fd6831958780a0119cc 100644 (file)
@@ -41,16 +41,18 @@ final class DataJournalV0 extends DataJournal {
     DataJournalV0(final String persistenceId, final Histogram messageSize, final ActorSystem system,
             final StorageLevel storage, final File directory, final int maxEntrySize, final int maxSegmentSize) {
         super(persistenceId, messageSize);
+
+        final var serdes = JournalSerdes.builder()
+            .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class)
+            .build();
+
         entries = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
             .withDirectory(directory)
             .withName("data")
             .withStorageLevel(storage)
             .withMaxEntrySize(maxEntrySize)
             .withMaxSegmentSize(maxSegmentSize)
-            .build(),
-            JournalSerdes.builder()
-                .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class)
-                .build().toMapper());
+            .build(), serdes.toReadMapper(), serdes.toWriteMapper());
     }
 
     @Override
index 89d9c64c25fdc33a29ad30e71ae5c7e651073620..49cfaf080cb7e9fa37afc435bd69e5421635823f 100644 (file)
@@ -23,11 +23,13 @@ import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Stopwatch;
+import io.atomix.storage.journal.FromByteBufMapper;
 import io.atomix.storage.journal.Indexed;
 import io.atomix.storage.journal.JournalSerdes;
 import io.atomix.storage.journal.SegmentedByteBufJournal;
 import io.atomix.storage.journal.SegmentedJournal;
 import io.atomix.storage.journal.StorageLevel;
+import io.atomix.storage.journal.ToByteBufMapper;
 import java.io.File;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -283,10 +285,18 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
-    private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder()
-        .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
-        .build();
     private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+    private static final FromByteBufMapper<Long> READ_MAPPER;
+    private static final ToByteBufMapper<Long> WRITE_MAPPER;
+
+    static {
+        final var namespace = JournalSerdes.builder()
+            .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
+            .build();
+
+        READ_MAPPER = namespace.toReadMapper();
+        WRITE_MAPPER = namespace.toWriteMapper();
+    }
 
     private final String persistenceId;
     private final StorageLevel storage;
@@ -499,7 +509,7 @@ abstract sealed class SegmentedJournalActor extends AbstractActor {
             .withDirectory(directory)
             .withName("delete")
             .withMaxSegmentSize(DELETE_SEGMENT_SIZE)
-            .build(), DELETE_NAMESPACE.toMapper());
+            .build(), READ_MAPPER, WRITE_MAPPER);
         final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
             .tryNext((index, value, length) -> value);
         lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;