From a6a5fe176855c600d957dad4bf0dd74dbcf24a55 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 2 Mar 2023 18:38:30 +0100 Subject: [PATCH] Introduce atomix.storage.journal.JournalSerdes The interfaces are tightly coupled with namespace implementation, which in turn is tightly coupled to Kryo. As a first step to disconnect these three, introduce JournalSerdes, which acts as the public API implemented by Namespace. This allows us to hide atomix.utils.serializer package at least from OSGi. Change-Id: I9668ba2eb3e0a58d7ed27ee09ec1bb3cee069eb4 Signed-off-by: Robert Varga --- .../akka/segjournal/DataJournalV0.java | 4 +- .../segjournal/SegmentedJournalActor.java | 4 +- third-party/atomix/storage/pom.xml | 1 - .../FileChannelJournalSegmentReader.java | 6 +- .../FileChannelJournalSegmentWriter.java | 5 +- .../storage/journal/JournalSegment.java | 6 +- .../atomix/storage/journal/JournalSerdes.java | 156 +++++ .../journal/MappableJournalSegmentReader.java | 6 +- .../journal/MappableJournalSegmentWriter.java | 6 +- .../journal/MappedJournalSegmentReader.java | 6 +- .../journal/MappedJournalSegmentWriter.java | 5 +- .../storage/journal/SegmentedJournal.java | 10 +- .../io/atomix/utils/serializer/Namespace.java | 659 ++++++++---------- .../storage/journal/AbstractJournalTest.java | 41 +- 14 files changed, 485 insertions(+), 430 deletions(-) create mode 100644 third-party/atomix/storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java index 93c93f8acc..5eb688a615 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java @@ -14,11 +14,11 @@ import akka.persistence.AtomicWrite; import akka.persistence.PersistentRepr; import com.codahale.metrics.Histogram; import io.atomix.storage.journal.Indexed; +import io.atomix.storage.journal.JournalSerdes; import io.atomix.storage.journal.SegmentedJournal; import io.atomix.storage.journal.SegmentedJournalReader; import io.atomix.storage.journal.SegmentedJournalWriter; import io.atomix.storage.journal.StorageLevel; -import io.atomix.utils.serializer.Namespace; import java.io.File; import java.io.Serializable; import java.util.List; @@ -45,7 +45,7 @@ final class DataJournalV0 extends DataJournal { super(persistenceId, messageSize); entries = SegmentedJournal.builder() .withStorageLevel(storage).withDirectory(directory).withName("data") - .withNamespace(Namespace.builder() + .withNamespace(JournalSerdes.builder() .register(new DataJournalEntrySerializer(system), FromPersistence.class, ToPersistence.class) .build()) .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize) diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java index 67c87f1658..b292ee1bf6 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.java @@ -22,10 +22,10 @@ import com.esotericsoftware.kryo.serializers.DefaultSerializers.LongSerializer; import com.google.common.base.MoreObjects; import com.google.common.base.Stopwatch; import io.atomix.storage.journal.Indexed; +import io.atomix.storage.journal.JournalSerdes; import io.atomix.storage.journal.SegmentedJournal; import io.atomix.storage.journal.SegmentedJournalWriter; import io.atomix.storage.journal.StorageLevel; -import io.atomix.utils.serializer.Namespace; import java.io.File; import java.util.ArrayList; import java.util.List; @@ -148,7 +148,7 @@ final class SegmentedJournalActor extends AbstractActor { } private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class); - private static final Namespace DELETE_NAMESPACE = Namespace.builder() + private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder() .register(new LongSerializer(), Long.class) .build(); private static final int DELETE_SEGMENT_SIZE = 64 * 1024; diff --git a/third-party/atomix/storage/pom.xml b/third-party/atomix/storage/pom.xml index 8bbcb7c523..f05cd2e6e1 100644 --- a/third-party/atomix/storage/pom.xml +++ b/third-party/atomix/storage/pom.xml @@ -129,7 +129,6 @@ io.atomix.storage.journal, - io.atomix.utils.serializer, com.esotericsoftware.kryo.*;version=4.0.2 diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentReader.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentReader.java index 98133546cf..696ab45cde 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentReader.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentReader.java @@ -17,8 +17,6 @@ package io.atomix.storage.journal; import io.atomix.storage.journal.index.JournalIndex; import io.atomix.storage.journal.index.Position; -import io.atomix.utils.serializer.Namespace; - import java.io.IOException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -36,7 +34,7 @@ final class FileChannelJournalSegmentReader implements JournalReader { private final FileChannel channel; private final int maxEntrySize; private final JournalIndex index; - private final Namespace namespace; + private final JournalSerdes namespace; private final ByteBuffer memory; private final long firstIndex; private Indexed currentEntry; @@ -47,7 +45,7 @@ final class FileChannelJournalSegmentReader implements JournalReader { JournalSegment segment, int maxEntrySize, JournalIndex index, - Namespace namespace) { + JournalSerdes namespace) { this.channel = channel; this.maxEntrySize = maxEntrySize; this.index = index; diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java index b885c1a026..ba99cf3c7e 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/FileChannelJournalSegmentWriter.java @@ -17,7 +17,6 @@ package io.atomix.storage.journal; import com.esotericsoftware.kryo.KryoException; import io.atomix.storage.journal.index.JournalIndex; -import io.atomix.utils.serializer.Namespace; import java.io.IOException; import java.nio.BufferOverflowException; @@ -47,7 +46,7 @@ class FileChannelJournalSegmentWriter implements JournalWriter { private final JournalSegment segment; private final int maxEntrySize; private final JournalIndex index; - private final Namespace namespace; + private final JournalSerdes namespace; private final ByteBuffer memory; private final long firstIndex; private Indexed lastEntry; @@ -57,7 +56,7 @@ class FileChannelJournalSegmentWriter implements JournalWriter { JournalSegment segment, int maxEntrySize, JournalIndex index, - Namespace namespace) { + JournalSerdes namespace) { this.channel = channel; this.segment = segment; this.maxEntrySize = maxEntrySize; diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/JournalSegment.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/JournalSegment.java index aff124ea71..9a75973e7d 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/JournalSegment.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/JournalSegment.java @@ -17,8 +17,6 @@ package io.atomix.storage.journal; import io.atomix.storage.journal.index.JournalIndex; import io.atomix.storage.journal.index.SparseJournalIndex; -import io.atomix.utils.serializer.Namespace; - import java.io.File; import java.io.IOException; import java.nio.MappedByteBuffer; @@ -43,7 +41,7 @@ public class JournalSegment implements AutoCloseable { private final StorageLevel storageLevel; private final int maxEntrySize; private final JournalIndex index; - private final Namespace namespace; + private final JournalSerdes namespace; private final MappableJournalSegmentWriter writer; private final Set> readers = ConcurrentHashMap.newKeySet(); private final AtomicInteger references = new AtomicInteger(); @@ -55,7 +53,7 @@ public class JournalSegment implements AutoCloseable { StorageLevel storageLevel, int maxEntrySize, double indexDensity, - Namespace namespace) { + JournalSerdes namespace) { this.file = file; this.descriptor = descriptor; this.storageLevel = storageLevel; diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java new file mode 100644 index 0000000000..af9ccb56f4 --- /dev/null +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/JournalSerdes.java @@ -0,0 +1,156 @@ +/* + * Copyright 2014-2021 Open Networking Foundation + * Copyright 2023 PANTHEON.tech, s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.storage.journal; + +import com.esotericsoftware.kryo.Serializer; +import io.atomix.utils.serializer.Namespace; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * Support for serialization of {@link Journal} entries. + */ +public interface JournalSerdes { + /** + * Serializes given object to byte array. + * + * @param obj Object to serialize + * @return serialized bytes + */ + byte[] serialize(Object obj); + + /** + * Serializes given object to byte array. + * + * @param obj Object to serialize + * @param bufferSize maximum size of serialized bytes + * @return serialized bytes + */ + byte[] serialize(Object obj, int bufferSize); + + /** + * Serializes given object to byte buffer. + * + * @param obj Object to serialize + * @param buffer to write to + */ + void serialize(Object obj, ByteBuffer buffer); + + /** + * Serializes given object to OutputStream. + * + * @param obj Object to serialize + * @param stream to write to + */ + void serialize(Object obj, OutputStream stream); + + /** + * Serializes given object to OutputStream. + * + * @param obj Object to serialize + * @param stream to write to + * @param bufferSize size of the buffer in front of the stream + */ + void serialize(Object obj, OutputStream stream, int bufferSize); + + /** + * Deserializes given byte array to Object. + * + * @param bytes serialized bytes + * @param deserialized Object type + * @return deserialized Object + */ + T deserialize(byte[] bytes); + + /** + * Deserializes given byte buffer to Object. + * + * @param buffer input with serialized bytes + * @param deserialized Object type + * @return deserialized Object + */ + T deserialize(final ByteBuffer buffer); + + /** + * Deserializes given InputStream to an Object. + * + * @param stream input stream + * @param deserialized Object type + * @return deserialized Object + */ + T deserialize(InputStream stream); + + /** + * Deserializes given InputStream to an Object. + * + * @param stream input stream + * @param deserialized Object type + * @param bufferSize size of the buffer in front of the stream + * @return deserialized Object + */ + T deserialize(final InputStream stream, final int bufferSize); + + /** + * Creates a new {@link JournalSerdes} builder. + * + * @return builder + */ + static Builder builder() { + return Namespace.builder(); + } + + /** + * Builder for {@link JournalSerdes}. + */ + interface Builder { + /** + * Builds a {@link JournalSerdes} instance. + * + * @return A {@link JournalSerdes} implementation. + */ + JournalSerdes build(); + + /** + * Builds a {@link JournalSerdes} instance. + * + * @param friendlyName friendly name for the namespace + * @return A {@link JournalSerdes} implementation. + */ + JournalSerdes build(String friendlyName); + + /** + * Registers serializer for the given set of classes. + *

+ * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees + * all instances will be serialized with the same type ID. + * + * @param classes list of classes to register + * @param serializer serializer to use for the class + * @return this builder + */ + Builder register(Serializer serializer, Class... classes); + + /** + * Sets the namespace class loader. + * + * @param classLoader the namespace class loader + * @return this builder + */ + Builder setClassLoader(ClassLoader classLoader); + } +} diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java index 304e88b5ff..c8aa8fcd40 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentReader.java @@ -16,8 +16,6 @@ package io.atomix.storage.journal; import io.atomix.storage.journal.index.JournalIndex; -import io.atomix.utils.serializer.Namespace; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -30,7 +28,7 @@ class MappableJournalSegmentReader implements JournalReader { private final FileChannel channel; private final int maxEntrySize; private final JournalIndex index; - private final Namespace namespace; + private final JournalSerdes namespace; private JournalReader reader; MappableJournalSegmentReader( @@ -38,7 +36,7 @@ class MappableJournalSegmentReader implements JournalReader { JournalSegment segment, int maxEntrySize, JournalIndex index, - Namespace namespace) { + JournalSerdes namespace) { this.channel = channel; this.segment = segment; this.maxEntrySize = maxEntrySize; diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentWriter.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentWriter.java index 81211e296c..f19783a9c4 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentWriter.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappableJournalSegmentWriter.java @@ -16,8 +16,6 @@ package io.atomix.storage.journal; import io.atomix.storage.journal.index.JournalIndex; -import io.atomix.utils.serializer.Namespace; - import java.io.IOException; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; @@ -30,7 +28,7 @@ class MappableJournalSegmentWriter implements JournalWriter { private final JournalSegment segment; private final int maxEntrySize; private final JournalIndex index; - private final Namespace namespace; + private final JournalSerdes namespace; private JournalWriter writer; MappableJournalSegmentWriter( @@ -38,7 +36,7 @@ class MappableJournalSegmentWriter implements JournalWriter { JournalSegment segment, int maxEntrySize, JournalIndex index, - Namespace namespace) { + JournalSerdes namespace) { this.channel = channel; this.segment = segment; this.maxEntrySize = maxEntrySize; diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java index 7ad7c219bc..1a33701082 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentReader.java @@ -17,8 +17,6 @@ package io.atomix.storage.journal; import io.atomix.storage.journal.index.JournalIndex; import io.atomix.storage.journal.index.Position; -import io.atomix.utils.serializer.Namespace; - import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.NoSuchElementException; @@ -33,7 +31,7 @@ final class MappedJournalSegmentReader implements JournalReader { private final ByteBuffer buffer; private final int maxEntrySize; private final JournalIndex index; - private final Namespace namespace; + private final JournalSerdes namespace; private final long firstIndex; private Indexed currentEntry; private Indexed nextEntry; @@ -43,7 +41,7 @@ final class MappedJournalSegmentReader implements JournalReader { JournalSegment segment, int maxEntrySize, JournalIndex index, - Namespace namespace) { + JournalSerdes namespace) { this.buffer = buffer.slice(); this.maxEntrySize = maxEntrySize; this.index = index; diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java index 7eca961103..d461b81aec 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/MappedJournalSegmentWriter.java @@ -17,7 +17,6 @@ package io.atomix.storage.journal; import com.esotericsoftware.kryo.KryoException; import io.atomix.storage.journal.index.JournalIndex; -import io.atomix.utils.serializer.Namespace; import java.io.IOException; import java.nio.BufferOverflowException; @@ -47,7 +46,7 @@ class MappedJournalSegmentWriter implements JournalWriter { private final JournalSegment segment; private final int maxEntrySize; private final JournalIndex index; - private final Namespace namespace; + private final JournalSerdes namespace; private final long firstIndex; private Indexed lastEntry; @@ -56,7 +55,7 @@ class MappedJournalSegmentWriter implements JournalWriter { JournalSegment segment, int maxEntrySize, JournalIndex index, - Namespace namespace) { + JournalSerdes namespace) { this.mappedBuffer = buffer; this.buffer = buffer.slice(); this.segment = segment; diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 893ae4f944..f2d99eec53 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -29,8 +29,6 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; - -import io.atomix.utils.serializer.Namespace; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +56,7 @@ public final class SegmentedJournal implements Journal { private final String name; private final StorageLevel storageLevel; private final File directory; - private final Namespace namespace; + private final JournalSerdes namespace; private final int maxSegmentSize; private final int maxEntrySize; private final int maxEntriesPerSegment; @@ -77,7 +75,7 @@ public final class SegmentedJournal implements Journal { String name, StorageLevel storageLevel, File directory, - Namespace namespace, + JournalSerdes namespace, int maxSegmentSize, int maxEntrySize, int maxEntriesPerSegment, @@ -677,7 +675,7 @@ public final class SegmentedJournal implements Journal { private String name = DEFAULT_NAME; private StorageLevel storageLevel = StorageLevel.DISK; private File directory = new File(DEFAULT_DIRECTORY); - private Namespace namespace; + private JournalSerdes namespace; private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE; private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE; private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT; @@ -744,7 +742,7 @@ public final class SegmentedJournal implements Journal { * @param namespace The journal serializer. * @return The journal builder. */ - public Builder withNamespace(Namespace namespace) { + public Builder withNamespace(JournalSerdes namespace) { this.namespace = requireNonNull(namespace, "namespace cannot be null"); return this; } diff --git a/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/Namespace.java b/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/Namespace.java index 4ff322518c..3cab13d997 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/Namespace.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/Namespace.java @@ -1,5 +1,6 @@ /* - * Copyright 2014-present Open Networking Foundation + * Copyright 2014-2021 Open Networking Foundation + * Copyright 2023 PANTHEON.tech, s.r.o. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +16,8 @@ */ package io.atomix.utils.serializer; +import static java.util.Objects.requireNonNull; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Registration; import com.esotericsoftware.kryo.Serializer; @@ -25,10 +28,7 @@ import com.esotericsoftware.kryo.pool.KryoFactory; import com.esotericsoftware.kryo.pool.KryoPool; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; -import org.objenesis.strategy.StdInstantiatorStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import io.atomix.storage.journal.JournalSerdes; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.OutputStream; @@ -39,423 +39,338 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; - -import static java.util.Objects.requireNonNull; +import org.objenesis.strategy.StdInstantiatorStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Pool of Kryo instances, with classes pre-registered. */ -//@ThreadSafe -public final class Namespace implements KryoFactory, KryoPool { - - /** - * Default buffer size used for serialization. - * - * @see #serialize(Object) - */ - public static final int DEFAULT_BUFFER_SIZE = 4096; - - /** - * ID to use if this KryoNamespace does not define registration id. - */ - private static final int FLOATING_ID = -1; +public final class Namespace implements JournalSerdes, KryoFactory, KryoPool { + /** + * Default buffer size used for serialization. + * + * @see #serialize(Object) + */ + private static final int DEFAULT_BUFFER_SIZE = 4096; - /** - * Smallest ID free to use for user defined registrations. - */ - private static final int INITIAL_ID = 16; + /** + * ID to use if this KryoNamespace does not define registration id. + */ + private static final int FLOATING_ID = -1; - static final String NO_NAME = "(no name)"; + /** + * Smallest ID free to use for user defined registrations. + */ + private static final int INITIAL_ID = 16; - private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class); + private static final String NO_NAME = "(no name)"; - private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build(); + private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class); - private final KryoOutputPool kryoOutputPool = new KryoOutputPool(); - private final KryoInputPool kryoInputPool = new KryoInputPool(); + private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build(); - private final ImmutableList registeredBlocks; + private final KryoOutputPool kryoOutputPool = new KryoOutputPool(); + private final KryoInputPool kryoInputPool = new KryoInputPool(); - private final ClassLoader classLoader; - private final String friendlyName; + private final ImmutableList registeredBlocks; - /** - * KryoNamespace builder. - */ - //@NotThreadSafe - public static final class Builder { - private int blockHeadId = INITIAL_ID; - private List[], Serializer>> types = new ArrayList<>(); - private List blocks = new ArrayList<>(); - private ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + private final ClassLoader classLoader; + private final String friendlyName; /** - * Builds a {@link Namespace} instance. - * - * @return KryoNamespace + * KryoNamespace builder. */ - public Namespace build() { - return build(NO_NAME); + private static final class Builder implements JournalSerdes.Builder { + private final int blockHeadId = INITIAL_ID; + private final List[], Serializer>> types = new ArrayList<>(); + private final List blocks = new ArrayList<>(); + private ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + @Override + public Builder register(final Serializer serializer, final Class... classes) { + types.add(Map.entry(classes, serializer)); + return this; + } + + @Override + public Builder setClassLoader(final ClassLoader classLoader) { + this.classLoader = requireNonNull(classLoader); + return this; + } + + @Override + public JournalSerdes build() { + return build(NO_NAME); + } + + @Override + public JournalSerdes build(final String friendlyName) { + if (!types.isEmpty()) { + blocks.add(new RegistrationBlock(blockHeadId, types)); + } + return new Namespace(blocks, classLoader, friendlyName); + } } /** - * Builds a {@link Namespace} instance. + * Creates a new {@link Namespace} builder. * - * @param friendlyName friendly name for the namespace - * @return KryoNamespace + * @return builder */ - public Namespace build(String friendlyName) { - if (!types.isEmpty()) { - blocks.add(new RegistrationBlock(this.blockHeadId, types)); - } - return new Namespace(blocks, classLoader, friendlyName); + public static JournalSerdes.Builder builder() { + return new Builder(); } /** - * Registers serializer for the given set of classes. - *

- * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees - * all instances will be serialized with the same type ID. + * Creates a Kryo instance pool. * - * @param classes list of classes to register - * @param serializer serializer to use for the class - * @return this + * @param registeredTypes types to register + * @param registrationRequired whether registration is required + * @param friendlyName friendly name for the namespace */ - public Builder register(Serializer serializer, final Class... classes) { - types.add(Map.entry(classes, serializer)); - return this; + private Namespace( + final List registeredTypes, + final ClassLoader classLoader, + final String friendlyName) { + registeredBlocks = ImmutableList.copyOf(registeredTypes); + this.classLoader = classLoader; + this.friendlyName = requireNonNull(friendlyName); + + // Pre-populate with a single instance + release(create()); } - /** - * Sets the namespace class loader. - * - * @param classLoader the namespace class loader - * @return the namespace builder - */ - public Builder setClassLoader(ClassLoader classLoader) { - this.classLoader = classLoader; - return this; + @Override + public byte[] serialize(final Object obj) { + return serialize(obj, DEFAULT_BUFFER_SIZE); } - } - - /** - * Creates a new {@link Namespace} builder. - * - * @return builder - */ - public static Builder builder() { - return new Builder(); - } - - /** - * Creates a Kryo instance pool. - * - * @param registeredTypes types to register - * @param registrationRequired whether registration is required - * @param friendlyName friendly name for the namespace - */ - private Namespace( - final List registeredTypes, - ClassLoader classLoader, - String friendlyName) { - this.registeredBlocks = ImmutableList.copyOf(registeredTypes); - this.classLoader = classLoader; - this.friendlyName = requireNonNull(friendlyName); - - // Pre-populate with a single instance - release(create()); - } - - /** - * Serializes given object to byte array using Kryo instance in pool. - * - * @param obj Object to serialize - * @return serialized bytes - */ - public byte[] serialize(final Object obj) { - return serialize(obj, DEFAULT_BUFFER_SIZE); - } - - /** - * Serializes given object to byte array using Kryo instance in pool. - * - * @param obj Object to serialize - * @param bufferSize maximum size of serialized bytes - * @return serialized bytes - */ - public byte[] serialize(final Object obj, final int bufferSize) { - return kryoOutputPool.run(output -> { - return kryoPool.run(kryo -> { - kryo.writeClassAndObject(output, obj); - output.flush(); - return output.getByteArrayOutputStream().toByteArray(); - }); - }, bufferSize); - } - - /** - * Serializes given object to byte buffer using Kryo instance in pool. - * - * @param obj Object to serialize - * @param buffer to write to - */ - public void serialize(final Object obj, final ByteBuffer buffer) { - ByteBufferOutput out = new ByteBufferOutput(buffer); - Kryo kryo = borrow(); - try { - kryo.writeClassAndObject(out, obj); - out.flush(); - } finally { - release(kryo); + + @Override + public byte[] serialize(final Object obj, final int bufferSize) { + return kryoOutputPool.run(output -> kryoPool.run(kryo -> { + kryo.writeClassAndObject(output, obj); + output.flush(); + return output.getByteArrayOutputStream().toByteArray(); + }), bufferSize); } - } - - /** - * Serializes given object to OutputStream using Kryo instance in pool. - * - * @param obj Object to serialize - * @param stream to write to - */ - public void serialize(final Object obj, final OutputStream stream) { - serialize(obj, stream, DEFAULT_BUFFER_SIZE); - } - - /** - * Serializes given object to OutputStream using Kryo instance in pool. - * - * @param obj Object to serialize - * @param stream to write to - * @param bufferSize size of the buffer in front of the stream - */ - public void serialize(final Object obj, final OutputStream stream, final int bufferSize) { - ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize); - Kryo kryo = borrow(); - try { - kryo.writeClassAndObject(out, obj); - out.flush(); - } finally { - release(kryo); + + @Override + public void serialize(final Object obj, final ByteBuffer buffer) { + ByteBufferOutput out = new ByteBufferOutput(buffer); + Kryo kryo = borrow(); + try { + kryo.writeClassAndObject(out, obj); + out.flush(); + } finally { + release(kryo); + } } - } - - /** - * Deserializes given byte array to Object using Kryo instance in pool. - * - * @param bytes serialized bytes - * @param deserialized Object type - * @return deserialized Object - */ - public T deserialize(final byte[] bytes) { - return kryoInputPool.run(input -> { - input.setInputStream(new ByteArrayInputStream(bytes)); - return kryoPool.run(kryo -> { - @SuppressWarnings("unchecked") - T obj = (T) kryo.readClassAndObject(input); - return obj; - }); - }, DEFAULT_BUFFER_SIZE); - } - - /** - * Deserializes given byte buffer to Object using Kryo instance in pool. - * - * @param buffer input with serialized bytes - * @param deserialized Object type - * @return deserialized Object - */ - public T deserialize(final ByteBuffer buffer) { - ByteBufferInput in = new ByteBufferInput(buffer); - Kryo kryo = borrow(); - try { - @SuppressWarnings("unchecked") - T obj = (T) kryo.readClassAndObject(in); - return obj; - } finally { - release(kryo); + + @Override + public void serialize(final Object obj, final OutputStream stream) { + serialize(obj, stream, DEFAULT_BUFFER_SIZE); } - } - - /** - * Deserializes given InputStream to an Object using Kryo instance in pool. - * - * @param stream input stream - * @param deserialized Object type - * @return deserialized Object - */ - public T deserialize(final InputStream stream) { - return deserialize(stream, DEFAULT_BUFFER_SIZE); - } - - /** - * Deserializes given InputStream to an Object using Kryo instance in pool. - * - * @param stream input stream - * @param deserialized Object type - * @param bufferSize size of the buffer in front of the stream - * @return deserialized Object - */ - public T deserialize(final InputStream stream, final int bufferSize) { - ByteBufferInput in = new ByteBufferInput(stream, bufferSize); - Kryo kryo = borrow(); - try { - @SuppressWarnings("unchecked") - T obj = (T) kryo.readClassAndObject(in); - return obj; - } finally { - release(kryo); + + @Override + public void serialize(final Object obj, final OutputStream stream, final int bufferSize) { + ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize); + Kryo kryo = borrow(); + try { + kryo.writeClassAndObject(out, obj); + out.flush(); + } finally { + release(kryo); + } } - } - - /** - * Creates a Kryo instance. - * - * @return Kryo instance - */ - @Override - public Kryo create() { - LOGGER.trace("Creating Kryo instance for {}", this); - Kryo kryo = new Kryo(); - kryo.setClassLoader(classLoader); - kryo.setRegistrationRequired(true); - - // TODO rethink whether we want to use StdInstantiatorStrategy - kryo.setInstantiatorStrategy( - new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); - - for (RegistrationBlock block : registeredBlocks) { - int id = block.begin(); - if (id == FLOATING_ID) { - id = kryo.getNextRegistrationId(); - } - for (Entry[], Serializer> entry : block.types()) { - register(kryo, entry.getKey(), entry.getValue(), id++); - } + + @Override + public T deserialize(final byte[] bytes) { + return kryoInputPool.run(input -> { + input.setInputStream(new ByteArrayInputStream(bytes)); + return kryoPool.run(kryo -> { + @SuppressWarnings("unchecked") + T obj = (T) kryo.readClassAndObject(input); + return obj; + }); + }, DEFAULT_BUFFER_SIZE); } - return kryo; - } - - /** - * Register {@code type} and {@code serializer} to {@code kryo} instance. - * - * @param kryo Kryo instance - * @param types types to register - * @param serializer Specific serializer to register or null to use default. - * @param id type registration id to use - */ - private void register(Kryo kryo, Class[] types, Serializer serializer, int id) { - Registration existing = kryo.getRegistration(id); - if (existing != null) { - boolean matches = false; - for (Class type : types) { - if (existing.getType() == type) { - matches = true; - break; + + @Override + public T deserialize(final ByteBuffer buffer) { + ByteBufferInput in = new ByteBufferInput(buffer); + Kryo kryo = borrow(); + try { + @SuppressWarnings("unchecked") + T obj = (T) kryo.readClassAndObject(in); + return obj; + } finally { + release(kryo); } - } - - if (!matches) { - LOGGER.error("{}: Failed to register {} as {}, {} was already registered.", - friendlyName, types, id, existing.getType()); - - throw new IllegalStateException(String.format( - "Failed to register %s as %s, %s was already registered.", - Arrays.toString(types), id, existing.getType())); - } - // falling through to register call for now. - // Consider skipping, if there's reasonable - // way to compare serializer equivalence. } - for (Class type : types) { - Registration r = null; - if (serializer == null) { - r = kryo.register(type, id); - } else if (type.isInterface()) { - kryo.addDefaultSerializer(type, serializer); - } else { - r = kryo.register(type, serializer, id); - } - if (r != null) { - if (r.getId() != id) { - LOGGER.debug("{}: {} already registered as {}. Skipping {}.", - friendlyName, r.getType(), r.getId(), id); - } - LOGGER.trace("{} registered as {}", r.getType(), r.getId()); - } + @Override + public T deserialize(final InputStream stream) { + return deserialize(stream, DEFAULT_BUFFER_SIZE); } - } - - @Override - public Kryo borrow() { - return kryoPool.borrow(); - } - - @Override - public void release(Kryo kryo) { - kryoPool.release(kryo); - } - - @Override - public T run(KryoCallback callback) { - return kryoPool.run(callback); - } - - @Override - public String toString() { - if (!NO_NAME.equals(friendlyName)) { - return MoreObjects.toStringHelper(getClass()) - .omitNullValues() - .add("friendlyName", friendlyName) - // omit lengthy detail, when there's a name - .toString(); + + @Override + public T deserialize(final InputStream stream, final int bufferSize) { + ByteBufferInput in = new ByteBufferInput(stream, bufferSize); + Kryo kryo = borrow(); + try { + @SuppressWarnings("unchecked") + T obj = (T) kryo.readClassAndObject(in); + return obj; + } finally { + release(kryo); + } } - return MoreObjects.toStringHelper(getClass()) - .add("registeredBlocks", registeredBlocks) - .toString(); - } - - static final class RegistrationBlock { - private final int begin; - private final ImmutableList[], Serializer>> types; - - RegistrationBlock(int begin, List[], Serializer>> types) { - this.begin = begin; - this.types = ImmutableList.copyOf(types); + + /** + * Creates a Kryo instance. + * + * @return Kryo instance + */ + @Override + public Kryo create() { + LOGGER.trace("Creating Kryo instance for {}", this); + Kryo kryo = new Kryo(); + kryo.setClassLoader(classLoader); + kryo.setRegistrationRequired(true); + + // TODO rethink whether we want to use StdInstantiatorStrategy + kryo.setInstantiatorStrategy( + new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); + + for (RegistrationBlock block : registeredBlocks) { + int id = block.begin(); + if (id == FLOATING_ID) { + id = kryo.getNextRegistrationId(); + } + for (Entry[], Serializer> entry : block.types()) { + register(kryo, entry.getKey(), entry.getValue(), id++); + } + } + return kryo; } - public int begin() { - return begin; + /** + * Register {@code type} and {@code serializer} to {@code kryo} instance. + * + * @param kryo Kryo instance + * @param types types to register + * @param serializer Specific serializer to register or null to use default. + * @param id type registration id to use + */ + private void register(final Kryo kryo, final Class[] types, final Serializer serializer, final int id) { + Registration existing = kryo.getRegistration(id); + if (existing != null) { + boolean matches = false; + for (Class type : types) { + if (existing.getType() == type) { + matches = true; + break; + } + } + + if (!matches) { + LOGGER.error("{}: Failed to register {} as {}, {} was already registered.", + friendlyName, types, id, existing.getType()); + + throw new IllegalStateException(String.format( + "Failed to register %s as %s, %s was already registered.", + Arrays.toString(types), id, existing.getType())); + } + // falling through to register call for now. + // Consider skipping, if there's reasonable + // way to compare serializer equivalence. + } + + for (Class type : types) { + Registration r = null; + if (serializer == null) { + r = kryo.register(type, id); + } else if (type.isInterface()) { + kryo.addDefaultSerializer(type, serializer); + } else { + r = kryo.register(type, serializer, id); + } + if (r != null) { + if (r.getId() != id) { + LOGGER.debug("{}: {} already registered as {}. Skipping {}.", + friendlyName, r.getType(), r.getId(), id); + } + LOGGER.trace("{} registered as {}", r.getType(), r.getId()); + } + } } - public ImmutableList[], Serializer>> types() { - return types; + @Override + public Kryo borrow() { + return kryoPool.borrow(); } @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("begin", begin) - .add("types", types) - .toString(); + public void release(final Kryo kryo) { + kryoPool.release(kryo); } @Override - public int hashCode() { - return types.hashCode(); + public T run(final KryoCallback callback) { + return kryoPool.run(callback); } - // Only the registered types are used for equality. @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (obj instanceof RegistrationBlock) { - RegistrationBlock that = (RegistrationBlock) obj; - return Objects.equals(this.types, that.types); - } - return false; + public String toString() { + if (!NO_NAME.equals(friendlyName)) { + return MoreObjects.toStringHelper(getClass()) + .omitNullValues() + .add("friendlyName", friendlyName) + // omit lengthy detail, when there's a name + .toString(); + } + return MoreObjects.toStringHelper(getClass()).add("registeredBlocks", registeredBlocks).toString(); + } + + static final class RegistrationBlock { + private final int begin; + private final ImmutableList[], Serializer>> types; + + RegistrationBlock(final int begin, final List[], Serializer>> types) { + this.begin = begin; + this.types = ImmutableList.copyOf(types); + } + + public int begin() { + return begin; + } + + public ImmutableList[], Serializer>> types() { + return types; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()).add("begin", begin).add("types", types).toString(); + } + + @Override + public int hashCode() { + return types.hashCode(); + } + + // Only the registered types are used for equality. + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + + if (obj instanceof RegistrationBlock that) { + return Objects.equals(types, that.types); + } + return false; + } } - } } diff --git a/third-party/atomix/storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java b/third-party/atomix/storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java index eaddd3d28d..07fcd76a6b 100644 --- a/third-party/atomix/storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java +++ b/third-party/atomix/storage/src/test/java/io/atomix/storage/journal/AbstractJournalTest.java @@ -1,5 +1,6 @@ /* - * Copyright 2017-present Open Networking Foundation + * Copyright 2017-2021 Open Networking Foundation + * Copyright 2023 PANTHEON.tech, s.r.o. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,14 +16,13 @@ */ package io.atomix.storage.journal; -import com.esotericsoftware.kryo.serializers.DefaultArraySerializers.ByteArraySerializer; -import io.atomix.utils.serializer.Namespace; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import com.esotericsoftware.kryo.serializers.DefaultArraySerializers.ByteArraySerializer; import java.io.IOException; import java.nio.file.FileVisitResult; import java.nio.file.Files; @@ -32,12 +32,11 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Base journal test. @@ -46,7 +45,7 @@ import static org.junit.Assert.assertTrue; */ @RunWith(Parameterized.class) public abstract class AbstractJournalTest { - private static final Namespace NAMESPACE = Namespace.builder() + private static final JournalSerdes NAMESPACE = JournalSerdes.builder() .register(new TestEntrySerializer(), TestEntry.class) .register(new ByteArraySerializer(), byte[].class) .build(); @@ -57,10 +56,10 @@ public abstract class AbstractJournalTest { private final int maxSegmentSize; protected final int entriesPerSegment; - protected AbstractJournalTest(int maxSegmentSize) { + protected AbstractJournalTest(final int maxSegmentSize) { this.maxSegmentSize = maxSegmentSize; - int entryLength = (NAMESPACE.serialize(ENTRY).length + 8); - this.entriesPerSegment = (maxSegmentSize - 64) / entryLength; + int entryLength = NAMESPACE.serialize(ENTRY).length + 8; + entriesPerSegment = (maxSegmentSize - 64) / entryLength; } protected abstract StorageLevel storageLevel(); @@ -70,7 +69,7 @@ public abstract class AbstractJournalTest { List runs = new ArrayList<>(); for (int i = 1; i <= 10; i++) { for (int j = 1; j <= 10; j++) { - runs.add(new Object[]{64 + (i * (NAMESPACE.serialize(ENTRY).length + 8) + j)}); + runs.add(new Object[]{64 + i * (NAMESPACE.serialize(ENTRY).length + 8) + j}); } } return runs; @@ -373,13 +372,13 @@ public abstract class AbstractJournalTest { if (Files.exists(PATH)) { Files.walkFileTree(PATH, new SimpleFileVisitor() { @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException { Files.delete(file); return FileVisitResult.CONTINUE; } @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) throws IOException { Files.delete(dir); return FileVisitResult.CONTINUE; } -- 2.36.6