+++ /dev/null
-/*
- * Copyright 2014-2021 Open Networking Foundation
- * Copyright 2023 PANTHEON.tech, s.r.o.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.atomix.utils.serializer;
-
-import static java.util.Objects.requireNonNull;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Registration;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.ByteBufferInput;
-import com.esotericsoftware.kryo.io.ByteBufferOutput;
-import com.esotericsoftware.kryo.pool.KryoCallback;
-import com.esotericsoftware.kryo.pool.KryoFactory;
-import com.esotericsoftware.kryo.pool.KryoPool;
-import com.google.common.base.MoreObjects;
-import io.atomix.storage.journal.JournalSerdes;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Pool of Kryo instances, with classes pre-registered.
- */
-final class KryoJournalSerdes implements JournalSerdes, KryoFactory, KryoPool {
- /**
- * Default buffer size used for serialization.
- *
- * @see #serialize(Object)
- */
- private static final int DEFAULT_BUFFER_SIZE = 4096;
-
- /**
- * Smallest ID free to use for user defined registrations.
- */
- private static final int INITIAL_ID = 16;
-
- static final String NO_NAME = "(no name)";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(KryoJournalSerdes.class);
-
- private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
-
- private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
- private final KryoInputPool kryoInputPool = new KryoInputPool();
-
- private final List<RegisteredType> registeredTypes;
- private final ClassLoader classLoader;
- private final String friendlyName;
-
- /**
- * Creates a Kryo instance pool.
- *
- * @param registeredTypes types to register
- * @param registrationRequired whether registration is required
- * @param friendlyName friendly name for the namespace
- */
- KryoJournalSerdes(
- final List<RegisteredType> registeredTypes,
- final ClassLoader classLoader,
- final String friendlyName) {
- this.registeredTypes = List.copyOf(registeredTypes);
- this.classLoader = requireNonNull(classLoader);
- this.friendlyName = requireNonNull(friendlyName);
-
- // Pre-populate with a single instance
- release(create());
- }
-
- @Override
- public byte[] serialize(final Object obj) {
- return serialize(obj, DEFAULT_BUFFER_SIZE);
- }
-
- @Override
- public byte[] serialize(final Object obj, final int bufferSize) {
- return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
- kryo.writeClassAndObject(output, obj);
- output.flush();
- return output.getByteArrayOutputStream().toByteArray();
- }), bufferSize);
- }
-
- @Override
- public void serialize(final Object obj, final ByteBuffer buffer) {
- ByteBufferOutput out = new ByteBufferOutput(buffer);
- Kryo kryo = borrow();
- try {
- kryo.writeClassAndObject(out, obj);
- out.flush();
- } finally {
- release(kryo);
- }
- }
-
- @Override
- public void serialize(final Object obj, final OutputStream stream) {
- serialize(obj, stream, DEFAULT_BUFFER_SIZE);
- }
-
- @Override
- public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
- ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
- Kryo kryo = borrow();
- try {
- kryo.writeClassAndObject(out, obj);
- out.flush();
- } finally {
- release(kryo);
- }
- }
-
- @Override
- public <T> T deserialize(final byte[] bytes) {
- return kryoInputPool.run(input -> {
- input.setInputStream(new ByteArrayInputStream(bytes));
- return kryoPool.run(kryo -> {
- @SuppressWarnings("unchecked")
- T obj = (T) kryo.readClassAndObject(input);
- return obj;
- });
- }, DEFAULT_BUFFER_SIZE);
- }
-
- @Override
- public <T> T deserialize(final ByteBuffer buffer) {
- ByteBufferInput in = new ByteBufferInput(buffer);
- Kryo kryo = borrow();
- try {
- @SuppressWarnings("unchecked")
- T obj = (T) kryo.readClassAndObject(in);
- return obj;
- } finally {
- release(kryo);
- }
- }
-
- @Override
- public <T> T deserialize(final InputStream stream) {
- return deserialize(stream, DEFAULT_BUFFER_SIZE);
- }
-
- @Override
- public <T> T deserialize(final InputStream stream, final int bufferSize) {
- ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
- Kryo kryo = borrow();
- try {
- @SuppressWarnings("unchecked")
- T obj = (T) kryo.readClassAndObject(in);
- return obj;
- } finally {
- release(kryo);
- }
- }
-
- /**
- * Creates a Kryo instance.
- *
- * @return Kryo instance
- */
- @Override
- public Kryo create() {
- LOGGER.trace("Creating Kryo instance for {}", this);
- Kryo kryo = new Kryo();
- kryo.setClassLoader(classLoader);
- kryo.setRegistrationRequired(true);
-
- // TODO rethink whether we want to use StdInstantiatorStrategy
- kryo.setInstantiatorStrategy(
- new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
-
- int id = INITIAL_ID;
- for (RegisteredType registeredType : registeredTypes) {
- register(kryo, registeredType.types(), registeredType.serializer(), id++);
- }
- return kryo;
- }
-
- /**
- * Register {@code type} and {@code serializer} to {@code kryo} instance.
- *
- * @param kryo Kryo instance
- * @param types types to register
- * @param serializer Specific serializer to register or null to use default.
- * @param id type registration id to use
- */
- private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
- Registration existing = kryo.getRegistration(id);
- if (existing != null) {
- boolean matches = false;
- for (Class<?> type : types) {
- if (existing.getType() == type) {
- matches = true;
- break;
- }
- }
-
- if (!matches) {
- LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
- friendlyName, types, id, existing.getType());
-
- throw new IllegalStateException(String.format(
- "Failed to register %s as %s, %s was already registered.",
- Arrays.toString(types), id, existing.getType()));
- }
- // falling through to register call for now.
- // Consider skipping, if there's reasonable
- // way to compare serializer equivalence.
- }
-
- for (Class<?> type : types) {
- Registration r = null;
- if (serializer == null) {
- r = kryo.register(type, id);
- } else if (type.isInterface()) {
- kryo.addDefaultSerializer(type, serializer);
- } else {
- r = kryo.register(type, serializer, id);
- }
- if (r != null) {
- if (r.getId() != id) {
- LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
- friendlyName, r.getType(), r.getId(), id);
- }
- LOGGER.trace("{} registered as {}", r.getType(), r.getId());
- }
- }
- }
-
- @Override
- public Kryo borrow() {
- return kryoPool.borrow();
- }
-
- @Override
- public void release(final Kryo kryo) {
- kryoPool.release(kryo);
- }
-
- @Override
- public <T> T run(final KryoCallback<T> callback) {
- return kryoPool.run(callback);
- }
-
- @Override
- public String toString() {
- if (!NO_NAME.equals(friendlyName)) {
- return MoreObjects.toStringHelper(getClass())
- .omitNullValues()
- .add("friendlyName", friendlyName)
- // omit lengthy detail, when there's a name
- .toString();
- }
- return MoreObjects.toStringHelper(getClass()).add("registeredTypes", registeredTypes).toString();
- }
-}