2 * Copyright 2014-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.serializer;
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
27 import com.google.common.base.MoreObjects;
28 import com.google.common.collect.ImmutableList;
29 import org.apache.commons.lang3.tuple.Pair;
30 import org.objenesis.strategy.StdInstantiatorStrategy;
31 import org.slf4j.Logger;
33 import java.io.ByteArrayInputStream;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.List;
40 import java.util.Objects;
42 import static com.google.common.base.Preconditions.checkNotNull;
43 import static org.slf4j.LoggerFactory.getLogger;
46 * Pool of Kryo instances, with classes pre-registered.
49 public final class Namespace implements KryoFactory, KryoPool {
52 * Default buffer size used for serialization.
54 * @see #serialize(Object)
56 public static final int DEFAULT_BUFFER_SIZE = 4096;
59 * Maximum allowed buffer size.
61 public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
64 * ID to use if this KryoNamespace does not define registration id.
66 public static final int FLOATING_ID = -1;
69 * Smallest ID free to use for user defined registrations.
71 public static final int INITIAL_ID = 16;
73 static final String NO_NAME = "(no name)";
75 private static final Logger LOGGER = getLogger(Namespace.class);
78 * Default Kryo namespace.
80 public static final Namespace DEFAULT = builder().build();
82 private final KryoPool kryoPool = new KryoPool.Builder(this)
86 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
87 private final KryoInputPool kryoInputPool = new KryoInputPool();
89 private final ImmutableList<RegistrationBlock> registeredBlocks;
91 private final ClassLoader classLoader;
92 private final boolean compatible;
93 private final boolean registrationRequired;
94 private final String friendlyName;
97 * KryoNamespace builder.
100 public static final class Builder {
101 private int blockHeadId = INITIAL_ID;
102 private List<Pair<Class<?>[], Serializer<?>>> types = new ArrayList<>();
103 private List<RegistrationBlock> blocks = new ArrayList<>();
104 private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
105 private boolean registrationRequired = true;
106 private boolean compatible = false;
109 * Builds a {@link Namespace} instance.
111 * @return KryoNamespace
113 public Namespace build() {
114 return build(NO_NAME);
118 * Builds a {@link Namespace} instance.
120 * @param friendlyName friendly name for the namespace
121 * @return KryoNamespace
123 public Namespace build(String friendlyName) {
124 if (!types.isEmpty()) {
125 blocks.add(new RegistrationBlock(this.blockHeadId, types));
127 return new Namespace(blocks, classLoader, registrationRequired, compatible, friendlyName).populate(1);
131 * Sets the next Kryo registration Id for following register entries.
133 * @param id Kryo registration Id
135 * @see Kryo#register(Class, Serializer, int)
137 public Builder nextId(final int id) {
138 if (!types.isEmpty()) {
139 if (id != FLOATING_ID && id < blockHeadId + types.size()) {
141 if (LOGGER.isWarnEnabled()) {
142 LOGGER.warn("requested nextId {} could potentially overlap "
143 + "with existing registrations {}+{} ",
144 id, blockHeadId, types.size(), new RuntimeException());
147 blocks.add(new RegistrationBlock(this.blockHeadId, types));
148 types = new ArrayList<>();
150 this.blockHeadId = id;
155 * Registers classes to be serialized using Kryo default serializer.
157 * @param expectedTypes list of classes
160 public Builder register(final Class<?>... expectedTypes) {
161 for (Class<?> clazz : expectedTypes) {
162 types.add(Pair.of(new Class<?>[]{clazz}, null));
168 * Registers serializer for the given set of classes.
170 * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
171 * all instances will be serialized with the same type ID.
173 * @param classes list of classes to register
174 * @param serializer serializer to use for the class
177 public Builder register(Serializer<?> serializer, final Class<?>... classes) {
178 types.add(Pair.of(classes, checkNotNull(serializer)));
182 private Builder register(RegistrationBlock block) {
183 if (block.begin() != FLOATING_ID) {
184 // flush pending types
185 nextId(block.begin());
187 nextId(block.begin() + block.types().size());
189 // flush pending types
190 final int addedBlockBegin = blockHeadId + types.size();
191 nextId(addedBlockBegin);
192 blocks.add(new RegistrationBlock(addedBlockBegin, block.types()));
193 nextId(addedBlockBegin + block.types().size());
199 * Registers all the class registered to given KryoNamespace.
201 * @param ns KryoNamespace
204 public Builder register(final Namespace ns) {
206 if (blocks.containsAll(ns.registeredBlocks)) {
207 // Everything was already registered.
208 LOGGER.debug("Ignoring {}, already registered.", ns);
211 for (RegistrationBlock block : ns.registeredBlocks) {
212 this.register(block);
218 * Sets the namespace class loader.
220 * @param classLoader the namespace class loader
221 * @return the namespace builder
223 public Builder setClassLoader(ClassLoader classLoader) {
224 this.classLoader = classLoader;
229 * Sets whether backwards/forwards compatible versioned serialization is enabled.
231 * When compatible serialization is enabled, the {@link CompatibleFieldSerializer} will be set as the
232 * default serializer for types that do not otherwise explicitly specify a serializer.
234 * @param compatible whether versioned serialization is enabled
237 public Builder setCompatible(boolean compatible) {
238 this.compatible = compatible;
243 * Sets the registrationRequired flag.
245 * @param registrationRequired Kryo's registrationRequired flag
247 * @see Kryo#setRegistrationRequired(boolean)
249 public Builder setRegistrationRequired(boolean registrationRequired) {
250 this.registrationRequired = registrationRequired;
256 * Creates a new {@link Namespace} builder.
260 public static Builder builder() {
261 return new Builder();
265 * Creates a Kryo instance pool.
267 * @param registeredTypes types to register
268 * @param registrationRequired whether registration is required
269 * @param compatible whether compatible serialization is enabled
270 * @param friendlyName friendly name for the namespace
273 final List<RegistrationBlock> registeredTypes,
274 ClassLoader classLoader,
275 boolean registrationRequired,
277 String friendlyName) {
278 this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
279 this.registrationRequired = registrationRequired;
280 this.classLoader = classLoader;
281 this.compatible = compatible;
282 this.friendlyName = checkNotNull(friendlyName);
286 * Populates the Kryo pool.
288 * @param instances to add to the pool
291 public Namespace populate(int instances) {
293 for (int i = 0; i < instances; ++i) {
300 * Serializes given object to byte array using Kryo instance in pool.
302 * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
304 * @param obj Object to serialize
305 * @return serialized bytes
307 public byte[] serialize(final Object obj) {
308 return serialize(obj, DEFAULT_BUFFER_SIZE);
312 * Serializes given object to byte array using Kryo instance in pool.
314 * @param obj Object to serialize
315 * @param bufferSize maximum size of serialized bytes
316 * @return serialized bytes
318 public byte[] serialize(final Object obj, final int bufferSize) {
319 return kryoOutputPool.run(output -> {
320 return kryoPool.run(kryo -> {
321 kryo.writeClassAndObject(output, obj);
323 return output.getByteArrayOutputStream().toByteArray();
329 * Serializes given object to byte buffer using Kryo instance in pool.
331 * @param obj Object to serialize
332 * @param buffer to write to
334 public void serialize(final Object obj, final ByteBuffer buffer) {
335 ByteBufferOutput out = new ByteBufferOutput(buffer);
336 Kryo kryo = borrow();
338 kryo.writeClassAndObject(out, obj);
346 * Serializes given object to OutputStream using Kryo instance in pool.
348 * @param obj Object to serialize
349 * @param stream to write to
351 public void serialize(final Object obj, final OutputStream stream) {
352 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
356 * Serializes given object to OutputStream using Kryo instance in pool.
358 * @param obj Object to serialize
359 * @param stream to write to
360 * @param bufferSize size of the buffer in front of the stream
362 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
363 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
364 Kryo kryo = borrow();
366 kryo.writeClassAndObject(out, obj);
374 * Deserializes given byte array to Object using Kryo instance in pool.
376 * @param bytes serialized bytes
377 * @param <T> deserialized Object type
378 * @return deserialized Object
380 public <T> T deserialize(final byte[] bytes) {
381 return kryoInputPool.run(input -> {
382 input.setInputStream(new ByteArrayInputStream(bytes));
383 return kryoPool.run(kryo -> {
384 @SuppressWarnings("unchecked")
385 T obj = (T) kryo.readClassAndObject(input);
388 }, DEFAULT_BUFFER_SIZE);
392 * Deserializes given byte buffer to Object using Kryo instance in pool.
394 * @param buffer input with serialized bytes
395 * @param <T> deserialized Object type
396 * @return deserialized Object
398 public <T> T deserialize(final ByteBuffer buffer) {
399 ByteBufferInput in = new ByteBufferInput(buffer);
400 Kryo kryo = borrow();
402 @SuppressWarnings("unchecked")
403 T obj = (T) kryo.readClassAndObject(in);
411 * Deserializes given InputStream to an Object using Kryo instance in pool.
413 * @param stream input stream
414 * @param <T> deserialized Object type
415 * @return deserialized Object
417 public <T> T deserialize(final InputStream stream) {
418 return deserialize(stream, DEFAULT_BUFFER_SIZE);
422 * Deserializes given InputStream to an Object using Kryo instance in pool.
424 * @param stream input stream
425 * @param <T> deserialized Object type
426 * @param bufferSize size of the buffer in front of the stream
427 * @return deserialized Object
429 public <T> T deserialize(final InputStream stream, final int bufferSize) {
430 ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
431 Kryo kryo = borrow();
433 @SuppressWarnings("unchecked")
434 T obj = (T) kryo.readClassAndObject(in);
441 private String friendlyName() {
446 * Gets the number of classes registered in this Kryo namespace.
448 * @return size of namespace
451 return (int) registeredBlocks.stream()
452 .flatMap(block -> block.types().stream())
457 * Creates a Kryo instance.
459 * @return Kryo instance
462 public Kryo create() {
463 LOGGER.trace("Creating Kryo instance for {}", this);
464 Kryo kryo = new Kryo();
465 kryo.setClassLoader(classLoader);
466 kryo.setRegistrationRequired(registrationRequired);
468 // If compatible serialization is enabled, override the default serializer.
470 kryo.setDefaultSerializer(CompatibleFieldSerializer::new);
473 // TODO rethink whether we want to use StdInstantiatorStrategy
474 kryo.setInstantiatorStrategy(
475 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
477 for (RegistrationBlock block : registeredBlocks) {
478 int id = block.begin();
479 if (id == FLOATING_ID) {
480 id = kryo.getNextRegistrationId();
482 for (Pair<Class<?>[], Serializer<?>> entry : block.types()) {
483 register(kryo, entry.getLeft(), entry.getRight(), id++);
490 * Register {@code type} and {@code serializer} to {@code kryo} instance.
492 * @param kryo Kryo instance
493 * @param types types to register
494 * @param serializer Specific serializer to register or null to use default.
495 * @param id type registration id to use
497 private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
498 Registration existing = kryo.getRegistration(id);
499 if (existing != null) {
500 boolean matches = false;
501 for (Class<?> type : types) {
502 if (existing.getType() == type) {
509 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
510 friendlyName(), types, id, existing.getType());
512 throw new IllegalStateException(String.format(
513 "Failed to register %s as %s, %s was already registered.",
514 Arrays.toString(types), id, existing.getType()));
516 // falling through to register call for now.
517 // Consider skipping, if there's reasonable
518 // way to compare serializer equivalence.
521 for (Class<?> type : types) {
522 Registration r = null;
523 if (serializer == null) {
524 r = kryo.register(type, id);
525 } else if (type.isInterface()) {
526 kryo.addDefaultSerializer(type, serializer);
528 r = kryo.register(type, serializer, id);
531 if (r.getId() != id) {
532 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
533 friendlyName(), r.getType(), r.getId(), id);
535 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
541 public Kryo borrow() {
542 return kryoPool.borrow();
546 public void release(Kryo kryo) {
547 kryoPool.release(kryo);
551 public <T> T run(KryoCallback<T> callback) {
552 return kryoPool.run(callback);
556 public String toString() {
557 if (!NO_NAME.equals(friendlyName)) {
558 return MoreObjects.toStringHelper(getClass())
560 .add("friendlyName", friendlyName)
561 // omit lengthy detail, when there's a name
564 return MoreObjects.toStringHelper(getClass())
565 .add("registeredBlocks", registeredBlocks)
569 static final class RegistrationBlock {
570 private final int begin;
571 private final ImmutableList<Pair<Class<?>[], Serializer<?>>> types;
573 RegistrationBlock(int begin, List<Pair<Class<?>[], Serializer<?>>> types) {
575 this.types = ImmutableList.copyOf(types);
582 public ImmutableList<Pair<Class<?>[], Serializer<?>>> types() {
587 public String toString() {
588 return MoreObjects.toStringHelper(getClass())
595 public int hashCode() {
596 return types.hashCode();
599 // Only the registered types are used for equality.
601 public boolean equals(Object obj) {
606 if (obj instanceof RegistrationBlock) {
607 RegistrationBlock that = (RegistrationBlock) obj;
608 return Objects.equals(this.types, that.types);