2 * Copyright 2014-2021 Open Networking Foundation
3 * Copyright 2023 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.utils.serializer;
19 import static java.util.Objects.requireNonNull;
21 import com.esotericsoftware.kryo.Kryo;
22 import com.esotericsoftware.kryo.Registration;
23 import com.esotericsoftware.kryo.Serializer;
24 import com.esotericsoftware.kryo.io.ByteBufferInput;
25 import com.esotericsoftware.kryo.io.ByteBufferOutput;
26 import com.esotericsoftware.kryo.pool.KryoCallback;
27 import com.esotericsoftware.kryo.pool.KryoFactory;
28 import com.esotericsoftware.kryo.pool.KryoPool;
29 import com.google.common.base.MoreObjects;
30 import io.atomix.storage.journal.JournalSerdes;
31 import java.io.ByteArrayInputStream;
32 import java.io.InputStream;
33 import java.io.OutputStream;
34 import java.nio.ByteBuffer;
35 import java.util.Arrays;
36 import java.util.List;
37 import org.objenesis.strategy.StdInstantiatorStrategy;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * Pool of Kryo instances, with classes pre-registered.
44 @Deprecated(forRemoval = true, since = "9.0.3")
45 final class KryoJournalSerdes implements JournalSerdes, KryoFactory, KryoPool {
47 * Default buffer size used for serialization.
49 * @see #serialize(Object)
51 private static final int DEFAULT_BUFFER_SIZE = 4096;
54 * Smallest ID free to use for user defined registrations.
56 private static final int INITIAL_ID = 16;
58 static final String NO_NAME = "(no name)";
60 private static final Logger LOGGER = LoggerFactory.getLogger(KryoJournalSerdes.class);
62 private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
64 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
65 private final KryoInputPool kryoInputPool = new KryoInputPool();
67 private final List<RegisteredType> registeredTypes;
68 private final ClassLoader classLoader;
69 private final String friendlyName;
72 * Creates a Kryo instance pool.
74 * @param registeredTypes types to register
75 * @param registrationRequired whether registration is required
76 * @param friendlyName friendly name for the namespace
79 final List<RegisteredType> registeredTypes,
80 final ClassLoader classLoader,
81 final String friendlyName) {
82 this.registeredTypes = List.copyOf(registeredTypes);
83 this.classLoader = requireNonNull(classLoader);
84 this.friendlyName = requireNonNull(friendlyName);
86 // Pre-populate with a single instance
91 public byte[] serialize(final Object obj) {
92 return serialize(obj, DEFAULT_BUFFER_SIZE);
96 public byte[] serialize(final Object obj, final int bufferSize) {
97 return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
98 kryo.writeClassAndObject(output, obj);
100 return output.getByteArrayOutputStream().toByteArray();
105 public void serialize(final Object obj, final ByteBuffer buffer) {
106 ByteBufferOutput out = new ByteBufferOutput(buffer);
107 Kryo kryo = borrow();
109 kryo.writeClassAndObject(out, obj);
117 public void serialize(final Object obj, final OutputStream stream) {
118 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
122 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
123 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
124 Kryo kryo = borrow();
126 kryo.writeClassAndObject(out, obj);
134 public <T> T deserialize(final byte[] bytes) {
135 return kryoInputPool.run(input -> {
136 input.setInputStream(new ByteArrayInputStream(bytes));
137 return kryoPool.run(kryo -> {
138 @SuppressWarnings("unchecked")
139 T obj = (T) kryo.readClassAndObject(input);
142 }, DEFAULT_BUFFER_SIZE);
146 public <T> T deserialize(final ByteBuffer buffer) {
147 Kryo kryo = borrow();
149 @SuppressWarnings("unchecked")
150 T obj = (T) kryo.readClassAndObject(new Kryo505ByteBufferInput(buffer));
158 public <T> T deserialize(final InputStream stream) {
159 return deserialize(stream, DEFAULT_BUFFER_SIZE);
163 public <T> T deserialize(final InputStream stream, final int bufferSize) {
164 Kryo kryo = borrow();
166 @SuppressWarnings("unchecked")
167 T obj = (T) kryo.readClassAndObject(new ByteBufferInput(stream, bufferSize));
175 * Creates a Kryo instance.
177 * @return Kryo instance
180 public Kryo create() {
181 LOGGER.trace("Creating Kryo instance for {}", this);
182 Kryo kryo = new Kryo();
183 kryo.setClassLoader(classLoader);
184 kryo.setRegistrationRequired(true);
186 // TODO rethink whether we want to use StdInstantiatorStrategy
187 kryo.setInstantiatorStrategy(
188 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
191 for (RegisteredType registeredType : registeredTypes) {
192 register(kryo, registeredType.types(), registeredType.serializer(), id++);
198 * Register {@code type} and {@code serializer} to {@code kryo} instance.
200 * @param kryo Kryo instance
201 * @param types types to register
202 * @param serializer Specific serializer to register or null to use default.
203 * @param id type registration id to use
205 private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
206 Registration existing = kryo.getRegistration(id);
207 if (existing != null) {
208 boolean matches = false;
209 for (Class<?> type : types) {
210 if (existing.getType() == type) {
217 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
218 friendlyName, types, id, existing.getType());
220 throw new IllegalStateException(String.format(
221 "Failed to register %s as %s, %s was already registered.",
222 Arrays.toString(types), id, existing.getType()));
224 // falling through to register call for now.
225 // Consider skipping, if there's reasonable
226 // way to compare serializer equivalence.
229 for (Class<?> type : types) {
230 Registration reg = null;
231 if (serializer == null) {
232 reg = kryo.register(type, id);
233 } else if (type.isInterface()) {
234 kryo.addDefaultSerializer(type, serializer);
236 reg = kryo.register(type, serializer, id);
239 if (reg.getId() != id) {
240 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
241 friendlyName, reg.getType(), reg.getId(), id);
243 LOGGER.trace("{} registered as {}", reg.getType(), reg.getId());
249 public Kryo borrow() {
250 return kryoPool.borrow();
254 public void release(final Kryo kryo) {
255 kryoPool.release(kryo);
259 public <T> T run(final KryoCallback<T> callback) {
260 return kryoPool.run(callback);
264 public String toString() {
265 if (!NO_NAME.equals(friendlyName)) {
266 return MoreObjects.toStringHelper(getClass())
268 .add("friendlyName", friendlyName)
269 // omit lengthy detail, when there's a name
272 return MoreObjects.toStringHelper(getClass()).add("registeredTypes", registeredTypes).toString();