2 * Copyright 2014-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.serializer;
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
27 import com.google.common.base.MoreObjects;
28 import com.google.common.collect.ImmutableList;
29 import org.objenesis.strategy.StdInstantiatorStrategy;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import java.io.ByteArrayInputStream;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.List;
41 import java.util.Map.Entry;
42 import java.util.Objects;
44 import static java.util.Objects.requireNonNull;
47 * Pool of Kryo instances, with classes pre-registered.
50 public final class Namespace implements KryoFactory, KryoPool {
53 * Default buffer size used for serialization.
55 * @see #serialize(Object)
57 public static final int DEFAULT_BUFFER_SIZE = 4096;
60 * Maximum allowed buffer size.
62 public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
65 * ID to use if this KryoNamespace does not define registration id.
67 public static final int FLOATING_ID = -1;
70 * Smallest ID free to use for user defined registrations.
72 public static final int INITIAL_ID = 16;
74 static final String NO_NAME = "(no name)";
76 private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
79 * Default Kryo namespace.
81 public static final Namespace DEFAULT = builder().build();
83 private final KryoPool kryoPool = new KryoPool.Builder(this)
87 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
88 private final KryoInputPool kryoInputPool = new KryoInputPool();
90 private final ImmutableList<RegistrationBlock> registeredBlocks;
92 private final ClassLoader classLoader;
93 private final boolean compatible;
94 private final boolean registrationRequired;
95 private final String friendlyName;
98 * KryoNamespace builder.
101 public static final class Builder {
102 private int blockHeadId = INITIAL_ID;
103 private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
104 private List<RegistrationBlock> blocks = new ArrayList<>();
105 private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
106 private boolean registrationRequired = true;
107 private boolean compatible = false;
110 * Builds a {@link Namespace} instance.
112 * @return KryoNamespace
114 public Namespace build() {
115 return build(NO_NAME);
119 * Builds a {@link Namespace} instance.
121 * @param friendlyName friendly name for the namespace
122 * @return KryoNamespace
124 public Namespace build(String friendlyName) {
125 if (!types.isEmpty()) {
126 blocks.add(new RegistrationBlock(this.blockHeadId, types));
128 return new Namespace(blocks, classLoader, registrationRequired, compatible, friendlyName).populate(1);
132 * Sets the next Kryo registration Id for following register entries.
134 * @param id Kryo registration Id
136 * @see Kryo#register(Class, Serializer, int)
138 public Builder nextId(final int id) {
139 if (!types.isEmpty()) {
140 if (id != FLOATING_ID && id < blockHeadId + types.size()) {
142 if (LOGGER.isWarnEnabled()) {
143 LOGGER.warn("requested nextId {} could potentially overlap "
144 + "with existing registrations {}+{} ",
145 id, blockHeadId, types.size(), new RuntimeException());
148 blocks.add(new RegistrationBlock(this.blockHeadId, types));
149 types = new ArrayList<>();
151 this.blockHeadId = id;
156 * Registers serializer for the given set of classes.
158 * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
159 * all instances will be serialized with the same type ID.
161 * @param classes list of classes to register
162 * @param serializer serializer to use for the class
165 public Builder register(Serializer<?> serializer, final Class<?>... classes) {
166 types.add(Map.entry(classes, serializer));
171 * Sets the namespace class loader.
173 * @param classLoader the namespace class loader
174 * @return the namespace builder
176 public Builder setClassLoader(ClassLoader classLoader) {
177 this.classLoader = classLoader;
182 * Sets whether backwards/forwards compatible versioned serialization is enabled.
184 * When compatible serialization is enabled, the {@link CompatibleFieldSerializer} will be set as the
185 * default serializer for types that do not otherwise explicitly specify a serializer.
187 * @param compatible whether versioned serialization is enabled
190 public Builder setCompatible(boolean compatible) {
191 this.compatible = compatible;
196 * Sets the registrationRequired flag.
198 * @param registrationRequired Kryo's registrationRequired flag
200 * @see Kryo#setRegistrationRequired(boolean)
202 public Builder setRegistrationRequired(boolean registrationRequired) {
203 this.registrationRequired = registrationRequired;
209 * Creates a new {@link Namespace} builder.
213 public static Builder builder() {
214 return new Builder();
218 * Creates a Kryo instance pool.
220 * @param registeredTypes types to register
221 * @param registrationRequired whether registration is required
222 * @param compatible whether compatible serialization is enabled
223 * @param friendlyName friendly name for the namespace
226 final List<RegistrationBlock> registeredTypes,
227 ClassLoader classLoader,
228 boolean registrationRequired,
230 String friendlyName) {
231 this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
232 this.registrationRequired = registrationRequired;
233 this.classLoader = classLoader;
234 this.compatible = compatible;
235 this.friendlyName = requireNonNull(friendlyName);
239 * Populates the Kryo pool.
241 * @param instances to add to the pool
244 public Namespace populate(int instances) {
246 for (int i = 0; i < instances; ++i) {
253 * Serializes given object to byte array using Kryo instance in pool.
255 * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
257 * @param obj Object to serialize
258 * @return serialized bytes
260 public byte[] serialize(final Object obj) {
261 return serialize(obj, DEFAULT_BUFFER_SIZE);
265 * Serializes given object to byte array using Kryo instance in pool.
267 * @param obj Object to serialize
268 * @param bufferSize maximum size of serialized bytes
269 * @return serialized bytes
271 public byte[] serialize(final Object obj, final int bufferSize) {
272 return kryoOutputPool.run(output -> {
273 return kryoPool.run(kryo -> {
274 kryo.writeClassAndObject(output, obj);
276 return output.getByteArrayOutputStream().toByteArray();
282 * Serializes given object to byte buffer using Kryo instance in pool.
284 * @param obj Object to serialize
285 * @param buffer to write to
287 public void serialize(final Object obj, final ByteBuffer buffer) {
288 ByteBufferOutput out = new ByteBufferOutput(buffer);
289 Kryo kryo = borrow();
291 kryo.writeClassAndObject(out, obj);
299 * Serializes given object to OutputStream using Kryo instance in pool.
301 * @param obj Object to serialize
302 * @param stream to write to
304 public void serialize(final Object obj, final OutputStream stream) {
305 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
309 * Serializes given object to OutputStream using Kryo instance in pool.
311 * @param obj Object to serialize
312 * @param stream to write to
313 * @param bufferSize size of the buffer in front of the stream
315 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
316 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
317 Kryo kryo = borrow();
319 kryo.writeClassAndObject(out, obj);
327 * Deserializes given byte array to Object using Kryo instance in pool.
329 * @param bytes serialized bytes
330 * @param <T> deserialized Object type
331 * @return deserialized Object
333 public <T> T deserialize(final byte[] bytes) {
334 return kryoInputPool.run(input -> {
335 input.setInputStream(new ByteArrayInputStream(bytes));
336 return kryoPool.run(kryo -> {
337 @SuppressWarnings("unchecked")
338 T obj = (T) kryo.readClassAndObject(input);
341 }, DEFAULT_BUFFER_SIZE);
345 * Deserializes given byte buffer to Object using Kryo instance in pool.
347 * @param buffer input with serialized bytes
348 * @param <T> deserialized Object type
349 * @return deserialized Object
351 public <T> T deserialize(final ByteBuffer buffer) {
352 ByteBufferInput in = new ByteBufferInput(buffer);
353 Kryo kryo = borrow();
355 @SuppressWarnings("unchecked")
356 T obj = (T) kryo.readClassAndObject(in);
364 * Deserializes given InputStream to an Object using Kryo instance in pool.
366 * @param stream input stream
367 * @param <T> deserialized Object type
368 * @return deserialized Object
370 public <T> T deserialize(final InputStream stream) {
371 return deserialize(stream, DEFAULT_BUFFER_SIZE);
375 * Deserializes given InputStream to an Object using Kryo instance in pool.
377 * @param stream input stream
378 * @param <T> deserialized Object type
379 * @param bufferSize size of the buffer in front of the stream
380 * @return deserialized Object
382 public <T> T deserialize(final InputStream stream, final int bufferSize) {
383 ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
384 Kryo kryo = borrow();
386 @SuppressWarnings("unchecked")
387 T obj = (T) kryo.readClassAndObject(in);
394 private String friendlyName() {
399 * Gets the number of classes registered in this Kryo namespace.
401 * @return size of namespace
404 return (int) registeredBlocks.stream()
405 .flatMap(block -> block.types().stream())
410 * Creates a Kryo instance.
412 * @return Kryo instance
415 public Kryo create() {
416 LOGGER.trace("Creating Kryo instance for {}", this);
417 Kryo kryo = new Kryo();
418 kryo.setClassLoader(classLoader);
419 kryo.setRegistrationRequired(registrationRequired);
421 // If compatible serialization is enabled, override the default serializer.
423 kryo.setDefaultSerializer(CompatibleFieldSerializer::new);
426 // TODO rethink whether we want to use StdInstantiatorStrategy
427 kryo.setInstantiatorStrategy(
428 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
430 for (RegistrationBlock block : registeredBlocks) {
431 int id = block.begin();
432 if (id == FLOATING_ID) {
433 id = kryo.getNextRegistrationId();
435 for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
436 register(kryo, entry.getKey(), entry.getValue(), id++);
443 * Register {@code type} and {@code serializer} to {@code kryo} instance.
445 * @param kryo Kryo instance
446 * @param types types to register
447 * @param serializer Specific serializer to register or null to use default.
448 * @param id type registration id to use
450 private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
451 Registration existing = kryo.getRegistration(id);
452 if (existing != null) {
453 boolean matches = false;
454 for (Class<?> type : types) {
455 if (existing.getType() == type) {
462 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
463 friendlyName(), types, id, existing.getType());
465 throw new IllegalStateException(String.format(
466 "Failed to register %s as %s, %s was already registered.",
467 Arrays.toString(types), id, existing.getType()));
469 // falling through to register call for now.
470 // Consider skipping, if there's reasonable
471 // way to compare serializer equivalence.
474 for (Class<?> type : types) {
475 Registration r = null;
476 if (serializer == null) {
477 r = kryo.register(type, id);
478 } else if (type.isInterface()) {
479 kryo.addDefaultSerializer(type, serializer);
481 r = kryo.register(type, serializer, id);
484 if (r.getId() != id) {
485 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
486 friendlyName(), r.getType(), r.getId(), id);
488 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
494 public Kryo borrow() {
495 return kryoPool.borrow();
499 public void release(Kryo kryo) {
500 kryoPool.release(kryo);
504 public <T> T run(KryoCallback<T> callback) {
505 return kryoPool.run(callback);
509 public String toString() {
510 if (!NO_NAME.equals(friendlyName)) {
511 return MoreObjects.toStringHelper(getClass())
513 .add("friendlyName", friendlyName)
514 // omit lengthy detail, when there's a name
517 return MoreObjects.toStringHelper(getClass())
518 .add("registeredBlocks", registeredBlocks)
522 static final class RegistrationBlock {
523 private final int begin;
524 private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
526 RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
528 this.types = ImmutableList.copyOf(types);
535 public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
540 public String toString() {
541 return MoreObjects.toStringHelper(getClass())
548 public int hashCode() {
549 return types.hashCode();
552 // Only the registered types are used for equality.
554 public boolean equals(Object obj) {
559 if (obj instanceof RegistrationBlock) {
560 RegistrationBlock that = (RegistrationBlock) obj;
561 return Objects.equals(this.types, that.types);