2 * Copyright 2014-2021 Open Networking Foundation
3 * Copyright 2023 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.utils.serializer;
19 import static java.util.Objects.requireNonNull;
21 import com.esotericsoftware.kryo.Kryo;
22 import com.esotericsoftware.kryo.Registration;
23 import com.esotericsoftware.kryo.Serializer;
24 import com.esotericsoftware.kryo.io.ByteBufferInput;
25 import com.esotericsoftware.kryo.io.ByteBufferOutput;
26 import com.esotericsoftware.kryo.pool.KryoCallback;
27 import com.esotericsoftware.kryo.pool.KryoFactory;
28 import com.esotericsoftware.kryo.pool.KryoPool;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.collect.ImmutableList;
31 import io.atomix.storage.journal.JournalSerdes;
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
40 import java.util.Map.Entry;
41 import java.util.Objects;
42 import org.objenesis.strategy.StdInstantiatorStrategy;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Pool of Kryo instances, with classes pre-registered.
49 public final class Namespace implements JournalSerdes, KryoFactory, KryoPool {
51 * Default buffer size used for serialization.
53 * @see #serialize(Object)
55 private static final int DEFAULT_BUFFER_SIZE = 4096;
58 * ID to use if this KryoNamespace does not define registration id.
60 private static final int FLOATING_ID = -1;
63 * Smallest ID free to use for user defined registrations.
65 private static final int INITIAL_ID = 16;
67 private static final String NO_NAME = "(no name)";
69 private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
71 private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
73 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
74 private final KryoInputPool kryoInputPool = new KryoInputPool();
76 private final ImmutableList<RegistrationBlock> registeredBlocks;
78 private final ClassLoader classLoader;
79 private final String friendlyName;
82 * KryoNamespace builder.
84 private static final class Builder implements JournalSerdes.Builder {
85 private final int blockHeadId = INITIAL_ID;
86 private final List<Entry<Class<?>[], EntrySerializer<?>>> types = new ArrayList<>();
87 private final List<RegistrationBlock> blocks = new ArrayList<>();
88 private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
91 public Builder register(final EntrySerdes<?> serdes, final Class<?>... classes) {
92 types.add(Map.entry(classes, new EntrySerializer<>(serdes)));
97 public Builder setClassLoader(final ClassLoader classLoader) {
98 this.classLoader = requireNonNull(classLoader);
103 public JournalSerdes build() {
104 return build(NO_NAME);
108 public JournalSerdes build(final String friendlyName) {
109 if (!types.isEmpty()) {
110 blocks.add(new RegistrationBlock(blockHeadId, types));
112 return new Namespace(blocks, classLoader, friendlyName);
117 * Creates a new {@link Namespace} builder.
121 public static JournalSerdes.Builder builder() {
122 return new Builder();
126 * Creates a Kryo instance pool.
128 * @param registeredTypes types to register
129 * @param registrationRequired whether registration is required
130 * @param friendlyName friendly name for the namespace
133 final List<RegistrationBlock> registeredTypes,
134 final ClassLoader classLoader,
135 final String friendlyName) {
136 registeredBlocks = ImmutableList.copyOf(registeredTypes);
137 this.classLoader = classLoader;
138 this.friendlyName = requireNonNull(friendlyName);
140 // Pre-populate with a single instance
145 public byte[] serialize(final Object obj) {
146 return serialize(obj, DEFAULT_BUFFER_SIZE);
150 public byte[] serialize(final Object obj, final int bufferSize) {
151 return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
152 kryo.writeClassAndObject(output, obj);
154 return output.getByteArrayOutputStream().toByteArray();
159 public void serialize(final Object obj, final ByteBuffer buffer) {
160 ByteBufferOutput out = new ByteBufferOutput(buffer);
161 Kryo kryo = borrow();
163 kryo.writeClassAndObject(out, obj);
171 public void serialize(final Object obj, final OutputStream stream) {
172 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
176 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
177 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
178 Kryo kryo = borrow();
180 kryo.writeClassAndObject(out, obj);
188 public <T> T deserialize(final byte[] bytes) {
189 return kryoInputPool.run(input -> {
190 input.setInputStream(new ByteArrayInputStream(bytes));
191 return kryoPool.run(kryo -> {
192 @SuppressWarnings("unchecked")
193 T obj = (T) kryo.readClassAndObject(input);
196 }, DEFAULT_BUFFER_SIZE);
200 public <T> T deserialize(final ByteBuffer buffer) {
201 ByteBufferInput in = new ByteBufferInput(buffer);
202 Kryo kryo = borrow();
204 @SuppressWarnings("unchecked")
205 T obj = (T) kryo.readClassAndObject(in);
213 public <T> T deserialize(final InputStream stream) {
214 return deserialize(stream, DEFAULT_BUFFER_SIZE);
218 public <T> T deserialize(final InputStream stream, final int bufferSize) {
219 ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
220 Kryo kryo = borrow();
222 @SuppressWarnings("unchecked")
223 T obj = (T) kryo.readClassAndObject(in);
231 * Creates a Kryo instance.
233 * @return Kryo instance
236 public Kryo create() {
237 LOGGER.trace("Creating Kryo instance for {}", this);
238 Kryo kryo = new Kryo();
239 kryo.setClassLoader(classLoader);
240 kryo.setRegistrationRequired(true);
242 // TODO rethink whether we want to use StdInstantiatorStrategy
243 kryo.setInstantiatorStrategy(
244 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
246 for (RegistrationBlock block : registeredBlocks) {
247 int id = block.begin();
248 if (id == FLOATING_ID) {
249 id = kryo.getNextRegistrationId();
251 for (Entry<Class<?>[], EntrySerializer<?>> entry : block.types()) {
252 register(kryo, entry.getKey(), entry.getValue(), id++);
259 * Register {@code type} and {@code serializer} to {@code kryo} instance.
261 * @param kryo Kryo instance
262 * @param types types to register
263 * @param serializer Specific serializer to register or null to use default.
264 * @param id type registration id to use
266 private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
267 Registration existing = kryo.getRegistration(id);
268 if (existing != null) {
269 boolean matches = false;
270 for (Class<?> type : types) {
271 if (existing.getType() == type) {
278 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
279 friendlyName, types, id, existing.getType());
281 throw new IllegalStateException(String.format(
282 "Failed to register %s as %s, %s was already registered.",
283 Arrays.toString(types), id, existing.getType()));
285 // falling through to register call for now.
286 // Consider skipping, if there's reasonable
287 // way to compare serializer equivalence.
290 for (Class<?> type : types) {
291 Registration r = null;
292 if (serializer == null) {
293 r = kryo.register(type, id);
294 } else if (type.isInterface()) {
295 kryo.addDefaultSerializer(type, serializer);
297 r = kryo.register(type, serializer, id);
300 if (r.getId() != id) {
301 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
302 friendlyName, r.getType(), r.getId(), id);
304 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
310 public Kryo borrow() {
311 return kryoPool.borrow();
315 public void release(final Kryo kryo) {
316 kryoPool.release(kryo);
320 public <T> T run(final KryoCallback<T> callback) {
321 return kryoPool.run(callback);
325 public String toString() {
326 if (!NO_NAME.equals(friendlyName)) {
327 return MoreObjects.toStringHelper(getClass())
329 .add("friendlyName", friendlyName)
330 // omit lengthy detail, when there's a name
333 return MoreObjects.toStringHelper(getClass()).add("registeredBlocks", registeredBlocks).toString();
336 static final class RegistrationBlock {
337 private final int begin;
338 private final ImmutableList<Entry<Class<?>[], EntrySerializer<?>>> types;
340 RegistrationBlock(final int begin, final List<Entry<Class<?>[], EntrySerializer<?>>> types) {
342 this.types = ImmutableList.copyOf(types);
349 public ImmutableList<Entry<Class<?>[], EntrySerializer<?>>> types() {
354 public String toString() {
355 return MoreObjects.toStringHelper(getClass()).add("begin", begin).add("types", types).toString();
359 public int hashCode() {
360 return types.hashCode();
363 // Only the registered types are used for equality.
365 public boolean equals(final Object obj) {
370 if (obj instanceof RegistrationBlock that) {
371 return Objects.equals(types, that.types);