Rename Namespace to KryoJournalSerdes
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / KryoJournalSerdes.java
diff --git a/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdes.java b/third-party/atomix/storage/src/main/java/io/atomix/utils/serializer/KryoJournalSerdes.java
new file mode 100644 (file)
index 0000000..d99c69d
--- /dev/null
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2014-2021 Open Networking Foundation
+ * Copyright 2023 PANTHEON.tech, s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.utils.serializer;
+
+import static java.util.Objects.requireNonNull;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Registration;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.ByteBufferInput;
+import com.esotericsoftware.kryo.io.ByteBufferOutput;
+import com.esotericsoftware.kryo.pool.KryoCallback;
+import com.esotericsoftware.kryo.pool.KryoFactory;
+import com.esotericsoftware.kryo.pool.KryoPool;
+import com.google.common.base.MoreObjects;
+import io.atomix.storage.journal.JournalSerdes;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pool of Kryo instances, with classes pre-registered.
+ */
+final class KryoJournalSerdes implements JournalSerdes, KryoFactory, KryoPool {
+    /**
+     * Default buffer size used for serialization.
+     *
+     * @see #serialize(Object)
+     */
+    private static final int DEFAULT_BUFFER_SIZE = 4096;
+
+    /**
+     * Smallest ID free to use for user defined registrations.
+     */
+    private static final int INITIAL_ID = 16;
+
+    static final String NO_NAME = "(no name)";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KryoJournalSerdes.class);
+
+    private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
+
+    private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
+    private final KryoInputPool kryoInputPool = new KryoInputPool();
+
+    private final List<RegisteredType> registeredTypes;
+    private final ClassLoader classLoader;
+    private final String friendlyName;
+
+    /**
+     * Creates a Kryo instance pool.
+     *
+     * @param registeredTypes      types to register
+     * @param registrationRequired whether registration is required
+     * @param friendlyName         friendly name for the namespace
+     */
+    KryoJournalSerdes(
+            final List<RegisteredType> registeredTypes,
+            final ClassLoader classLoader,
+            final String friendlyName) {
+        this.registeredTypes = List.copyOf(registeredTypes);
+        this.classLoader = requireNonNull(classLoader);
+        this.friendlyName = requireNonNull(friendlyName);
+
+        // Pre-populate with a single instance
+        release(create());
+    }
+
+    @Override
+    public byte[] serialize(final Object obj) {
+        return serialize(obj, DEFAULT_BUFFER_SIZE);
+    }
+
+    @Override
+    public byte[] serialize(final Object obj, final int bufferSize) {
+        return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
+            kryo.writeClassAndObject(output, obj);
+            output.flush();
+            return output.getByteArrayOutputStream().toByteArray();
+        }), bufferSize);
+    }
+
+    @Override
+    public void serialize(final Object obj, final ByteBuffer buffer) {
+        ByteBufferOutput out = new ByteBufferOutput(buffer);
+        Kryo kryo = borrow();
+        try {
+            kryo.writeClassAndObject(out, obj);
+            out.flush();
+        } finally {
+            release(kryo);
+        }
+    }
+
+    @Override
+    public void serialize(final Object obj, final OutputStream stream) {
+        serialize(obj, stream, DEFAULT_BUFFER_SIZE);
+    }
+
+    @Override
+    public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
+        ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
+        Kryo kryo = borrow();
+        try {
+            kryo.writeClassAndObject(out, obj);
+            out.flush();
+        } finally {
+            release(kryo);
+        }
+    }
+
+    @Override
+    public <T> T deserialize(final byte[] bytes) {
+        return kryoInputPool.run(input -> {
+            input.setInputStream(new ByteArrayInputStream(bytes));
+            return kryoPool.run(kryo -> {
+                @SuppressWarnings("unchecked")
+                T obj = (T) kryo.readClassAndObject(input);
+                return obj;
+            });
+        }, DEFAULT_BUFFER_SIZE);
+    }
+
+    @Override
+    public <T> T deserialize(final ByteBuffer buffer) {
+        ByteBufferInput in = new ByteBufferInput(buffer);
+        Kryo kryo = borrow();
+        try {
+            @SuppressWarnings("unchecked")
+            T obj = (T) kryo.readClassAndObject(in);
+            return obj;
+        } finally {
+            release(kryo);
+        }
+    }
+
+    @Override
+    public <T> T deserialize(final InputStream stream) {
+        return deserialize(stream, DEFAULT_BUFFER_SIZE);
+    }
+
+    @Override
+    public <T> T deserialize(final InputStream stream, final int bufferSize) {
+        ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
+        Kryo kryo = borrow();
+        try {
+            @SuppressWarnings("unchecked")
+            T obj = (T) kryo.readClassAndObject(in);
+            return obj;
+        } finally {
+            release(kryo);
+        }
+    }
+
+    /**
+     * Creates a Kryo instance.
+     *
+     * @return Kryo instance
+     */
+    @Override
+    public Kryo create() {
+        LOGGER.trace("Creating Kryo instance for {}", this);
+        Kryo kryo = new Kryo();
+        kryo.setClassLoader(classLoader);
+        kryo.setRegistrationRequired(true);
+
+        // TODO rethink whether we want to use StdInstantiatorStrategy
+        kryo.setInstantiatorStrategy(
+            new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
+
+        int id = INITIAL_ID;
+        for (RegisteredType registeredType : registeredTypes) {
+            register(kryo, registeredType.types(), registeredType.serializer(), id++);
+        }
+        return kryo;
+    }
+
+    /**
+     * Register {@code type} and {@code serializer} to {@code kryo} instance.
+     *
+     * @param kryo       Kryo instance
+     * @param types      types to register
+     * @param serializer Specific serializer to register or null to use default.
+     * @param id         type registration id to use
+     */
+    private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
+        Registration existing = kryo.getRegistration(id);
+        if (existing != null) {
+            boolean matches = false;
+            for (Class<?> type : types) {
+                if (existing.getType() == type) {
+                    matches = true;
+                    break;
+                }
+            }
+
+            if (!matches) {
+                LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
+                    friendlyName, types, id, existing.getType());
+
+                throw new IllegalStateException(String.format(
+                    "Failed to register %s as %s, %s was already registered.",
+                    Arrays.toString(types), id, existing.getType()));
+            }
+            // falling through to register call for now.
+            // Consider skipping, if there's reasonable
+            // way to compare serializer equivalence.
+        }
+
+        for (Class<?> type : types) {
+            Registration r = null;
+            if (serializer == null) {
+                r = kryo.register(type, id);
+            } else if (type.isInterface()) {
+                kryo.addDefaultSerializer(type, serializer);
+            } else {
+                r = kryo.register(type, serializer, id);
+            }
+            if (r != null) {
+                if (r.getId() != id) {
+                    LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
+                        friendlyName, r.getType(), r.getId(), id);
+                }
+                LOGGER.trace("{} registered as {}", r.getType(), r.getId());
+            }
+        }
+    }
+
+    @Override
+    public Kryo borrow() {
+        return kryoPool.borrow();
+    }
+
+    @Override
+    public void release(final Kryo kryo) {
+        kryoPool.release(kryo);
+    }
+
+    @Override
+    public <T> T run(final KryoCallback<T> callback) {
+        return kryoPool.run(callback);
+    }
+
+    @Override
+    public String toString() {
+        if (!NO_NAME.equals(friendlyName)) {
+            return MoreObjects.toStringHelper(getClass())
+                .omitNullValues()
+                .add("friendlyName", friendlyName)
+                // omit lengthy detail, when there's a name
+                .toString();
+        }
+        return MoreObjects.toStringHelper(getClass()).add("registeredTypes", registeredTypes).toString();
+    }
+}