2 * Copyright 2014-2021 Open Networking Foundation
3 * Copyright 2023 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.utils.serializer;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
22 import com.esotericsoftware.kryo.Kryo;
23 import com.esotericsoftware.kryo.Registration;
24 import com.esotericsoftware.kryo.Serializer;
25 import com.esotericsoftware.kryo.io.ByteBufferInput;
26 import com.esotericsoftware.kryo.io.ByteBufferOutput;
27 import com.esotericsoftware.kryo.pool.KryoCallback;
28 import com.esotericsoftware.kryo.pool.KryoFactory;
29 import com.esotericsoftware.kryo.pool.KryoPool;
30 import com.google.common.base.MoreObjects;
31 import io.atomix.storage.journal.JournalSerdes;
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
40 import java.util.Map.Entry;
41 import org.objenesis.strategy.StdInstantiatorStrategy;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Pool of Kryo instances, with classes pre-registered.
48 public final class Namespace implements JournalSerdes, KryoFactory, KryoPool {
50 * Default buffer size used for serialization.
52 * @see #serialize(Object)
54 private static final int DEFAULT_BUFFER_SIZE = 4096;
57 * Smallest ID free to use for user defined registrations.
59 private static final int INITIAL_ID = 16;
61 private static final String NO_NAME = "(no name)";
63 private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
65 private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
67 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
68 private final KryoInputPool kryoInputPool = new KryoInputPool();
70 private final List<Entry<Class<?>[], EntrySerializer<?>>> registeredTypes;
72 private final ClassLoader classLoader;
73 private final String friendlyName;
76 * Creates a Kryo instance pool.
78 * @param registeredTypes types to register
79 * @param registrationRequired whether registration is required
80 * @param friendlyName friendly name for the namespace
83 final List<Entry<Class<?>[], EntrySerializer<?>>> registeredTypes,
84 final ClassLoader classLoader,
85 final String friendlyName) {
86 this.registeredTypes = List.copyOf(registeredTypes);
87 this.classLoader = requireNonNull(classLoader);
88 this.friendlyName = requireNonNull(friendlyName);
90 // Pre-populate with a single instance
95 * Creates a new {@link Namespace} builder.
99 public static JournalSerdes.Builder builder() {
100 return new Builder();
104 public byte[] serialize(final Object obj) {
105 return serialize(obj, DEFAULT_BUFFER_SIZE);
109 public byte[] serialize(final Object obj, final int bufferSize) {
110 return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
111 kryo.writeClassAndObject(output, obj);
113 return output.getByteArrayOutputStream().toByteArray();
118 public void serialize(final Object obj, final ByteBuffer buffer) {
119 ByteBufferOutput out = new ByteBufferOutput(buffer);
120 Kryo kryo = borrow();
122 kryo.writeClassAndObject(out, obj);
130 public void serialize(final Object obj, final OutputStream stream) {
131 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
135 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
136 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
137 Kryo kryo = borrow();
139 kryo.writeClassAndObject(out, obj);
147 public <T> T deserialize(final byte[] bytes) {
148 return kryoInputPool.run(input -> {
149 input.setInputStream(new ByteArrayInputStream(bytes));
150 return kryoPool.run(kryo -> {
151 @SuppressWarnings("unchecked")
152 T obj = (T) kryo.readClassAndObject(input);
155 }, DEFAULT_BUFFER_SIZE);
159 public <T> T deserialize(final ByteBuffer buffer) {
160 ByteBufferInput in = new ByteBufferInput(buffer);
161 Kryo kryo = borrow();
163 @SuppressWarnings("unchecked")
164 T obj = (T) kryo.readClassAndObject(in);
172 public <T> T deserialize(final InputStream stream) {
173 return deserialize(stream, DEFAULT_BUFFER_SIZE);
177 public <T> T deserialize(final InputStream stream, final int bufferSize) {
178 ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
179 Kryo kryo = borrow();
181 @SuppressWarnings("unchecked")
182 T obj = (T) kryo.readClassAndObject(in);
190 * Creates a Kryo instance.
192 * @return Kryo instance
195 public Kryo create() {
196 LOGGER.trace("Creating Kryo instance for {}", this);
197 Kryo kryo = new Kryo();
198 kryo.setClassLoader(classLoader);
199 kryo.setRegistrationRequired(true);
201 // TODO rethink whether we want to use StdInstantiatorStrategy
202 kryo.setInstantiatorStrategy(
203 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
206 for (Entry<Class<?>[], EntrySerializer<?>> entry : registeredTypes) {
207 register(kryo, entry.getKey(), entry.getValue(), id++);
213 * Register {@code type} and {@code serializer} to {@code kryo} instance.
215 * @param kryo Kryo instance
216 * @param types types to register
217 * @param serializer Specific serializer to register or null to use default.
218 * @param id type registration id to use
220 private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
221 Registration existing = kryo.getRegistration(id);
222 if (existing != null) {
223 boolean matches = false;
224 for (Class<?> type : types) {
225 if (existing.getType() == type) {
232 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
233 friendlyName, types, id, existing.getType());
235 throw new IllegalStateException(String.format(
236 "Failed to register %s as %s, %s was already registered.",
237 Arrays.toString(types), id, existing.getType()));
239 // falling through to register call for now.
240 // Consider skipping, if there's reasonable
241 // way to compare serializer equivalence.
244 for (Class<?> type : types) {
245 Registration r = null;
246 if (serializer == null) {
247 r = kryo.register(type, id);
248 } else if (type.isInterface()) {
249 kryo.addDefaultSerializer(type, serializer);
251 r = kryo.register(type, serializer, id);
254 if (r.getId() != id) {
255 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
256 friendlyName, r.getType(), r.getId(), id);
258 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
264 public Kryo borrow() {
265 return kryoPool.borrow();
269 public void release(final Kryo kryo) {
270 kryoPool.release(kryo);
274 public <T> T run(final KryoCallback<T> callback) {
275 return kryoPool.run(callback);
279 public String toString() {
280 if (!NO_NAME.equals(friendlyName)) {
281 return MoreObjects.toStringHelper(getClass())
283 .add("friendlyName", friendlyName)
284 // omit lengthy detail, when there's a name
287 return MoreObjects.toStringHelper(getClass()).add("registeredTypes", registeredTypes).toString();
290 private static final class Builder implements JournalSerdes.Builder {
291 private final List<Entry<Class<?>[], EntrySerializer<?>>> types = new ArrayList<>();
292 private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
295 public Builder register(final EntrySerdes<?> serdes, final Class<?>... classes) {
296 types.add(Map.entry(classes, new EntrySerializer<>(serdes)));
301 public Builder setClassLoader(final ClassLoader classLoader) {
302 this.classLoader = requireNonNull(classLoader);
307 public JournalSerdes build() {
308 return build(NO_NAME);
312 public JournalSerdes build(final String friendlyName) {
313 checkState(!types.isEmpty(), "No serializers registered");
314 return new Namespace(types, classLoader, friendlyName);