X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=third-party%2Fatomix%2Fstorage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Futils%2Fserializer%2FNamespace.java;h=9dcbded38bb6b4f19a6a56d5e7a4ad20ba2c052d;hb=39fda828273d60937eaaff4d81183855343cbc57;hp=ff2afd09618a11b95c6dabcd9a30e89ab4954eef;hpb=42f5d41b96a3767c44df0e2d0ce9e78e6832b1b6;p=controller.git diff --git a/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/Namespace.java b/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/Namespace.java index ff2afd0961..9dcbded38b 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/Namespace.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/Namespace.java @@ -1,5 +1,6 @@ /* - * Copyright 2014-present Open Networking Foundation + * Copyright 2014-2021 Open Networking Foundation + * Copyright 2023 PANTHEON.tech, s.r.o. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +16,8 @@ */ package io.atomix.utils.serializer; +import static java.util.Objects.requireNonNull; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Registration; import com.esotericsoftware.kryo.Serializer; @@ -25,10 +28,7 @@ import com.esotericsoftware.kryo.pool.KryoFactory; import com.esotericsoftware.kryo.pool.KryoPool; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; -import org.objenesis.strategy.StdInstantiatorStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import io.atomix.storage.journal.JournalSerdes; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.OutputStream; @@ -39,474 +39,330 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; - -import static java.util.Objects.requireNonNull; +import org.objenesis.strategy.StdInstantiatorStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Pool of Kryo instances, with classes pre-registered. */ -//@ThreadSafe -public final class Namespace implements KryoFactory, KryoPool { - - /** - * Default buffer size used for serialization. - * - * @see #serialize(Object) - */ - public static final int DEFAULT_BUFFER_SIZE = 4096; - - /** - * ID to use if this KryoNamespace does not define registration id. - */ - private static final int FLOATING_ID = -1; - - /** - * Smallest ID free to use for user defined registrations. - */ - private static final int INITIAL_ID = 16; - - static final String NO_NAME = "(no name)"; - - private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class); - - /** - * Default Kryo namespace. - */ - public static final Namespace DEFAULT = builder().build(); - - private final KryoPool kryoPool = new KryoPool.Builder(this) - .softReferences() - .build(); - - private final KryoOutputPool kryoOutputPool = new KryoOutputPool(); - private final KryoInputPool kryoInputPool = new KryoInputPool(); - - private final ImmutableList registeredBlocks; - - private final ClassLoader classLoader; - private final boolean registrationRequired; - private final String friendlyName; - - /** - * KryoNamespace builder. - */ - //@NotThreadSafe - public static final class Builder { - private int blockHeadId = INITIAL_ID; - private List[], Serializer>> types = new ArrayList<>(); - private List blocks = new ArrayList<>(); - private ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - private boolean registrationRequired = true; - +public final class Namespace implements JournalSerdes, KryoFactory, KryoPool { /** - * Builds a {@link Namespace} instance. + * Default buffer size used for serialization. * - * @return KryoNamespace + * @see #serialize(Object) */ - public Namespace build() { - return build(NO_NAME); - } + private static final int DEFAULT_BUFFER_SIZE = 4096; /** - * Builds a {@link Namespace} instance. - * - * @param friendlyName friendly name for the namespace - * @return KryoNamespace + * Smallest ID free to use for user defined registrations. */ - public Namespace build(String friendlyName) { - if (!types.isEmpty()) { - blocks.add(new RegistrationBlock(this.blockHeadId, types)); - } - return new Namespace(blocks, classLoader, registrationRequired, friendlyName).populate(1); - } + private static final int INITIAL_ID = 16; + + private static final String NO_NAME = "(no name)"; + + private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class); + + private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build(); + + private final KryoOutputPool kryoOutputPool = new KryoOutputPool(); + private final KryoInputPool kryoInputPool = new KryoInputPool(); + + private final ImmutableList registeredBlocks; + + private final ClassLoader classLoader; + private final String friendlyName; /** - * Registers serializer for the given set of classes. - *

- * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees - * all instances will be serialized with the same type ID. - * - * @param classes list of classes to register - * @param serializer serializer to use for the class - * @return this + * KryoNamespace builder. */ - public Builder register(Serializer serializer, final Class... classes) { - types.add(Map.entry(classes, serializer)); - return this; + private static final class Builder implements JournalSerdes.Builder { + private final int blockHeadId = INITIAL_ID; + private final List[], EntrySerializer>> types = new ArrayList<>(); + private final List blocks = new ArrayList<>(); + private ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + @Override + public Builder register(final EntrySerdes serdes, final Class... classes) { + types.add(Map.entry(classes, new EntrySerializer<>(serdes))); + return this; + } + + @Override + public Builder setClassLoader(final ClassLoader classLoader) { + this.classLoader = requireNonNull(classLoader); + return this; + } + + @Override + public JournalSerdes build() { + return build(NO_NAME); + } + + @Override + public JournalSerdes build(final String friendlyName) { + if (!types.isEmpty()) { + blocks.add(new RegistrationBlock(blockHeadId, types)); + } + return new Namespace(blocks, classLoader, friendlyName); + } } /** - * Sets the namespace class loader. + * Creates a new {@link Namespace} builder. * - * @param classLoader the namespace class loader - * @return the namespace builder + * @return builder */ - public Builder setClassLoader(ClassLoader classLoader) { - this.classLoader = classLoader; - return this; + public static JournalSerdes.Builder builder() { + return new Builder(); } /** - * Sets the registrationRequired flag. + * Creates a Kryo instance pool. * - * @param registrationRequired Kryo's registrationRequired flag - * @return this - * @see Kryo#setRegistrationRequired(boolean) + * @param registeredTypes types to register + * @param registrationRequired whether registration is required + * @param friendlyName friendly name for the namespace */ - public Builder setRegistrationRequired(boolean registrationRequired) { - this.registrationRequired = registrationRequired; - return this; + private Namespace( + final List registeredTypes, + final ClassLoader classLoader, + final String friendlyName) { + registeredBlocks = ImmutableList.copyOf(registeredTypes); + this.classLoader = classLoader; + this.friendlyName = requireNonNull(friendlyName); + + // Pre-populate with a single instance + release(create()); } - } - - /** - * Creates a new {@link Namespace} builder. - * - * @return builder - */ - public static Builder builder() { - return new Builder(); - } - - /** - * Creates a Kryo instance pool. - * - * @param registeredTypes types to register - * @param registrationRequired whether registration is required - * @param friendlyName friendly name for the namespace - */ - private Namespace( - final List registeredTypes, - ClassLoader classLoader, - boolean registrationRequired, - String friendlyName) { - this.registeredBlocks = ImmutableList.copyOf(registeredTypes); - this.registrationRequired = registrationRequired; - this.classLoader = classLoader; - this.friendlyName = requireNonNull(friendlyName); - } - - /** - * Populates the Kryo pool. - * - * @param instances to add to the pool - * @return this - */ - public Namespace populate(int instances) { - - for (int i = 0; i < instances; ++i) { - release(create()); + + @Override + public byte[] serialize(final Object obj) { + return serialize(obj, DEFAULT_BUFFER_SIZE); } - return this; - } - - /** - * Serializes given object to byte array using Kryo instance in pool. - *

- * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}. - * - * @param obj Object to serialize - * @return serialized bytes - */ - public byte[] serialize(final Object obj) { - return serialize(obj, DEFAULT_BUFFER_SIZE); - } - - /** - * Serializes given object to byte array using Kryo instance in pool. - * - * @param obj Object to serialize - * @param bufferSize maximum size of serialized bytes - * @return serialized bytes - */ - public byte[] serialize(final Object obj, final int bufferSize) { - return kryoOutputPool.run(output -> { - return kryoPool.run(kryo -> { - kryo.writeClassAndObject(output, obj); - output.flush(); - return output.getByteArrayOutputStream().toByteArray(); - }); - }, bufferSize); - } - - /** - * Serializes given object to byte buffer using Kryo instance in pool. - * - * @param obj Object to serialize - * @param buffer to write to - */ - public void serialize(final Object obj, final ByteBuffer buffer) { - ByteBufferOutput out = new ByteBufferOutput(buffer); - Kryo kryo = borrow(); - try { - kryo.writeClassAndObject(out, obj); - out.flush(); - } finally { - release(kryo); + + @Override + public byte[] serialize(final Object obj, final int bufferSize) { + return kryoOutputPool.run(output -> kryoPool.run(kryo -> { + kryo.writeClassAndObject(output, obj); + output.flush(); + return output.getByteArrayOutputStream().toByteArray(); + }), bufferSize); } - } - - /** - * Serializes given object to OutputStream using Kryo instance in pool. - * - * @param obj Object to serialize - * @param stream to write to - */ - public void serialize(final Object obj, final OutputStream stream) { - serialize(obj, stream, DEFAULT_BUFFER_SIZE); - } - - /** - * Serializes given object to OutputStream using Kryo instance in pool. - * - * @param obj Object to serialize - * @param stream to write to - * @param bufferSize size of the buffer in front of the stream - */ - public void serialize(final Object obj, final OutputStream stream, final int bufferSize) { - ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize); - Kryo kryo = borrow(); - try { - kryo.writeClassAndObject(out, obj); - out.flush(); - } finally { - release(kryo); + + @Override + public void serialize(final Object obj, final ByteBuffer buffer) { + ByteBufferOutput out = new ByteBufferOutput(buffer); + Kryo kryo = borrow(); + try { + kryo.writeClassAndObject(out, obj); + out.flush(); + } finally { + release(kryo); + } } - } - - /** - * Deserializes given byte array to Object using Kryo instance in pool. - * - * @param bytes serialized bytes - * @param deserialized Object type - * @return deserialized Object - */ - public T deserialize(final byte[] bytes) { - return kryoInputPool.run(input -> { - input.setInputStream(new ByteArrayInputStream(bytes)); - return kryoPool.run(kryo -> { - @SuppressWarnings("unchecked") - T obj = (T) kryo.readClassAndObject(input); - return obj; - }); - }, DEFAULT_BUFFER_SIZE); - } - - /** - * Deserializes given byte buffer to Object using Kryo instance in pool. - * - * @param buffer input with serialized bytes - * @param deserialized Object type - * @return deserialized Object - */ - public T deserialize(final ByteBuffer buffer) { - ByteBufferInput in = new ByteBufferInput(buffer); - Kryo kryo = borrow(); - try { - @SuppressWarnings("unchecked") - T obj = (T) kryo.readClassAndObject(in); - return obj; - } finally { - release(kryo); + + @Override + public void serialize(final Object obj, final OutputStream stream) { + serialize(obj, stream, DEFAULT_BUFFER_SIZE); } - } - - /** - * Deserializes given InputStream to an Object using Kryo instance in pool. - * - * @param stream input stream - * @param deserialized Object type - * @return deserialized Object - */ - public T deserialize(final InputStream stream) { - return deserialize(stream, DEFAULT_BUFFER_SIZE); - } - - /** - * Deserializes given InputStream to an Object using Kryo instance in pool. - * - * @param stream input stream - * @param deserialized Object type - * @param bufferSize size of the buffer in front of the stream - * @return deserialized Object - */ - public T deserialize(final InputStream stream, final int bufferSize) { - ByteBufferInput in = new ByteBufferInput(stream, bufferSize); - Kryo kryo = borrow(); - try { - @SuppressWarnings("unchecked") - T obj = (T) kryo.readClassAndObject(in); - return obj; - } finally { - release(kryo); + + @Override + public void serialize(final Object obj, final OutputStream stream, final int bufferSize) { + ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize); + Kryo kryo = borrow(); + try { + kryo.writeClassAndObject(out, obj); + out.flush(); + } finally { + release(kryo); + } } - } - - private String friendlyName() { - return friendlyName; - } - - /** - * Gets the number of classes registered in this Kryo namespace. - * - * @return size of namespace - */ - public int size() { - return (int) registeredBlocks.stream() - .flatMap(block -> block.types().stream()) - .count(); - } - - /** - * Creates a Kryo instance. - * - * @return Kryo instance - */ - @Override - public Kryo create() { - LOGGER.trace("Creating Kryo instance for {}", this); - Kryo kryo = new Kryo(); - kryo.setClassLoader(classLoader); - kryo.setRegistrationRequired(registrationRequired); - - // TODO rethink whether we want to use StdInstantiatorStrategy - kryo.setInstantiatorStrategy( - new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); - - for (RegistrationBlock block : registeredBlocks) { - int id = block.begin(); - if (id == FLOATING_ID) { - id = kryo.getNextRegistrationId(); - } - for (Entry[], Serializer> entry : block.types()) { - register(kryo, entry.getKey(), entry.getValue(), id++); - } + + @Override + public T deserialize(final byte[] bytes) { + return kryoInputPool.run(input -> { + input.setInputStream(new ByteArrayInputStream(bytes)); + return kryoPool.run(kryo -> { + @SuppressWarnings("unchecked") + T obj = (T) kryo.readClassAndObject(input); + return obj; + }); + }, DEFAULT_BUFFER_SIZE); } - return kryo; - } - - /** - * Register {@code type} and {@code serializer} to {@code kryo} instance. - * - * @param kryo Kryo instance - * @param types types to register - * @param serializer Specific serializer to register or null to use default. - * @param id type registration id to use - */ - private void register(Kryo kryo, Class[] types, Serializer serializer, int id) { - Registration existing = kryo.getRegistration(id); - if (existing != null) { - boolean matches = false; - for (Class type : types) { - if (existing.getType() == type) { - matches = true; - break; + + @Override + public T deserialize(final ByteBuffer buffer) { + ByteBufferInput in = new ByteBufferInput(buffer); + Kryo kryo = borrow(); + try { + @SuppressWarnings("unchecked") + T obj = (T) kryo.readClassAndObject(in); + return obj; + } finally { + release(kryo); } - } - - if (!matches) { - LOGGER.error("{}: Failed to register {} as {}, {} was already registered.", - friendlyName(), types, id, existing.getType()); - - throw new IllegalStateException(String.format( - "Failed to register %s as %s, %s was already registered.", - Arrays.toString(types), id, existing.getType())); - } - // falling through to register call for now. - // Consider skipping, if there's reasonable - // way to compare serializer equivalence. } - for (Class type : types) { - Registration r = null; - if (serializer == null) { - r = kryo.register(type, id); - } else if (type.isInterface()) { - kryo.addDefaultSerializer(type, serializer); - } else { - r = kryo.register(type, serializer, id); - } - if (r != null) { - if (r.getId() != id) { - LOGGER.debug("{}: {} already registered as {}. Skipping {}.", - friendlyName(), r.getType(), r.getId(), id); - } - LOGGER.trace("{} registered as {}", r.getType(), r.getId()); - } + @Override + public T deserialize(final InputStream stream) { + return deserialize(stream, DEFAULT_BUFFER_SIZE); } - } - - @Override - public Kryo borrow() { - return kryoPool.borrow(); - } - - @Override - public void release(Kryo kryo) { - kryoPool.release(kryo); - } - - @Override - public T run(KryoCallback callback) { - return kryoPool.run(callback); - } - - @Override - public String toString() { - if (!NO_NAME.equals(friendlyName)) { - return MoreObjects.toStringHelper(getClass()) - .omitNullValues() - .add("friendlyName", friendlyName) - // omit lengthy detail, when there's a name - .toString(); + + @Override + public T deserialize(final InputStream stream, final int bufferSize) { + ByteBufferInput in = new ByteBufferInput(stream, bufferSize); + Kryo kryo = borrow(); + try { + @SuppressWarnings("unchecked") + T obj = (T) kryo.readClassAndObject(in); + return obj; + } finally { + release(kryo); + } } - return MoreObjects.toStringHelper(getClass()) - .add("registeredBlocks", registeredBlocks) - .toString(); - } - - static final class RegistrationBlock { - private final int begin; - private final ImmutableList[], Serializer>> types; - - RegistrationBlock(int begin, List[], Serializer>> types) { - this.begin = begin; - this.types = ImmutableList.copyOf(types); + + /** + * Creates a Kryo instance. + * + * @return Kryo instance + */ + @Override + public Kryo create() { + LOGGER.trace("Creating Kryo instance for {}", this); + Kryo kryo = new Kryo(); + kryo.setClassLoader(classLoader); + kryo.setRegistrationRequired(true); + + // TODO rethink whether we want to use StdInstantiatorStrategy + kryo.setInstantiatorStrategy( + new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); + + for (RegistrationBlock block : registeredBlocks) { + int id = block.begin(); + for (Entry[], EntrySerializer> entry : block.types()) { + register(kryo, entry.getKey(), entry.getValue(), id++); + } + } + return kryo; } - public int begin() { - return begin; + /** + * Register {@code type} and {@code serializer} to {@code kryo} instance. + * + * @param kryo Kryo instance + * @param types types to register + * @param serializer Specific serializer to register or null to use default. + * @param id type registration id to use + */ + private void register(final Kryo kryo, final Class[] types, final Serializer serializer, final int id) { + Registration existing = kryo.getRegistration(id); + if (existing != null) { + boolean matches = false; + for (Class type : types) { + if (existing.getType() == type) { + matches = true; + break; + } + } + + if (!matches) { + LOGGER.error("{}: Failed to register {} as {}, {} was already registered.", + friendlyName, types, id, existing.getType()); + + throw new IllegalStateException(String.format( + "Failed to register %s as %s, %s was already registered.", + Arrays.toString(types), id, existing.getType())); + } + // falling through to register call for now. + // Consider skipping, if there's reasonable + // way to compare serializer equivalence. + } + + for (Class type : types) { + Registration r = null; + if (serializer == null) { + r = kryo.register(type, id); + } else if (type.isInterface()) { + kryo.addDefaultSerializer(type, serializer); + } else { + r = kryo.register(type, serializer, id); + } + if (r != null) { + if (r.getId() != id) { + LOGGER.debug("{}: {} already registered as {}. Skipping {}.", + friendlyName, r.getType(), r.getId(), id); + } + LOGGER.trace("{} registered as {}", r.getType(), r.getId()); + } + } } - public ImmutableList[], Serializer>> types() { - return types; + @Override + public Kryo borrow() { + return kryoPool.borrow(); } @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("begin", begin) - .add("types", types) - .toString(); + public void release(final Kryo kryo) { + kryoPool.release(kryo); } @Override - public int hashCode() { - return types.hashCode(); + public T run(final KryoCallback callback) { + return kryoPool.run(callback); } - // Only the registered types are used for equality. @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (obj instanceof RegistrationBlock) { - RegistrationBlock that = (RegistrationBlock) obj; - return Objects.equals(this.types, that.types); - } - return false; + public String toString() { + if (!NO_NAME.equals(friendlyName)) { + return MoreObjects.toStringHelper(getClass()) + .omitNullValues() + .add("friendlyName", friendlyName) + // omit lengthy detail, when there's a name + .toString(); + } + return MoreObjects.toStringHelper(getClass()).add("registeredBlocks", registeredBlocks).toString(); + } + + static final class RegistrationBlock { + private final int begin; + private final ImmutableList[], EntrySerializer>> types; + + RegistrationBlock(final int begin, final List[], EntrySerializer>> types) { + this.begin = begin; + this.types = ImmutableList.copyOf(types); + } + + public int begin() { + return begin; + } + + public ImmutableList[], EntrySerializer>> types() { + return types; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()).add("begin", begin).add("types", types).toString(); + } + + @Override + public int hashCode() { + return types.hashCode(); + } + + // Only the registered types are used for equality. + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + + if (obj instanceof RegistrationBlock that) { + return Objects.equals(types, that.types); + } + return false; + } } - } }