2 * Copyright 2014-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.serializer;
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
27 import com.google.common.base.MoreObjects;
28 import com.google.common.collect.ImmutableList;
29 import io.atomix.utils.config.ConfigurationException;
30 import org.apache.commons.lang3.tuple.Pair;
31 import org.objenesis.strategy.StdInstantiatorStrategy;
32 import org.slf4j.Logger;
34 import java.io.ByteArrayInputStream;
35 import java.io.InputStream;
36 import java.io.OutputStream;
37 import java.nio.ByteBuffer;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.Objects;
44 import static com.google.common.base.Preconditions.checkNotNull;
45 import static org.slf4j.LoggerFactory.getLogger;
48 * Pool of Kryo instances, with classes pre-registered.
51 public final class Namespace implements KryoFactory, KryoPool {
54 * Default buffer size used for serialization.
56 * @see #serialize(Object)
58 public static final int DEFAULT_BUFFER_SIZE = 4096;
61 * Maximum allowed buffer size.
63 public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
66 * ID to use if this KryoNamespace does not define registration id.
68 public static final int FLOATING_ID = -1;
71 * Smallest ID free to use for user defined registrations.
73 public static final int INITIAL_ID = 16;
75 static final String NO_NAME = "(no name)";
77 private static final Logger LOGGER = getLogger(Namespace.class);
80 * Default Kryo namespace.
82 public static final Namespace DEFAULT = builder().build();
84 private final KryoPool kryoPool = new KryoPool.Builder(this)
88 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
89 private final KryoInputPool kryoInputPool = new KryoInputPool();
91 private final ImmutableList<RegistrationBlock> registeredBlocks;
93 private final ClassLoader classLoader;
94 private final boolean compatible;
95 private final boolean registrationRequired;
96 private final String friendlyName;
99 * KryoNamespace builder.
102 public static final class Builder {
103 private int blockHeadId = INITIAL_ID;
104 private List<Pair<Class<?>[], Serializer<?>>> types = new ArrayList<>();
105 private List<RegistrationBlock> blocks = new ArrayList<>();
106 private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
107 private boolean registrationRequired = true;
108 private boolean compatible = false;
111 * Builds a {@link Namespace} instance.
113 * @return KryoNamespace
115 public Namespace build() {
116 return build(NO_NAME);
120 * Builds a {@link Namespace} instance.
122 * @param friendlyName friendly name for the namespace
123 * @return KryoNamespace
125 public Namespace build(String friendlyName) {
126 if (!types.isEmpty()) {
127 blocks.add(new RegistrationBlock(this.blockHeadId, types));
129 return new Namespace(blocks, classLoader, registrationRequired, compatible, friendlyName).populate(1);
133 * Sets the next Kryo registration Id for following register entries.
135 * @param id Kryo registration Id
137 * @see Kryo#register(Class, Serializer, int)
139 public Builder nextId(final int id) {
140 if (!types.isEmpty()) {
141 if (id != FLOATING_ID && id < blockHeadId + types.size()) {
143 if (LOGGER.isWarnEnabled()) {
144 LOGGER.warn("requested nextId {} could potentially overlap "
145 + "with existing registrations {}+{} ",
146 id, blockHeadId, types.size(), new RuntimeException());
149 blocks.add(new RegistrationBlock(this.blockHeadId, types));
150 types = new ArrayList<>();
152 this.blockHeadId = id;
157 * Registers classes to be serialized using Kryo default serializer.
159 * @param expectedTypes list of classes
162 public Builder register(final Class<?>... expectedTypes) {
163 for (Class<?> clazz : expectedTypes) {
164 types.add(Pair.of(new Class<?>[]{clazz}, null));
170 * Registers serializer for the given set of classes.
172 * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
173 * all instances will be serialized with the same type ID.
175 * @param classes list of classes to register
176 * @param serializer serializer to use for the class
179 public Builder register(Serializer<?> serializer, final Class<?>... classes) {
180 types.add(Pair.of(classes, checkNotNull(serializer)));
184 private Builder register(RegistrationBlock block) {
185 if (block.begin() != FLOATING_ID) {
186 // flush pending types
187 nextId(block.begin());
189 nextId(block.begin() + block.types().size());
191 // flush pending types
192 final int addedBlockBegin = blockHeadId + types.size();
193 nextId(addedBlockBegin);
194 blocks.add(new RegistrationBlock(addedBlockBegin, block.types()));
195 nextId(addedBlockBegin + block.types().size());
201 * Registers all the class registered to given KryoNamespace.
203 * @param ns KryoNamespace
206 public Builder register(final Namespace ns) {
208 if (blocks.containsAll(ns.registeredBlocks)) {
209 // Everything was already registered.
210 LOGGER.debug("Ignoring {}, already registered.", ns);
213 for (RegistrationBlock block : ns.registeredBlocks) {
214 this.register(block);
220 * Sets the namespace class loader.
222 * @param classLoader the namespace class loader
223 * @return the namespace builder
225 public Builder setClassLoader(ClassLoader classLoader) {
226 this.classLoader = classLoader;
231 * Sets whether backwards/forwards compatible versioned serialization is enabled.
233 * When compatible serialization is enabled, the {@link CompatibleFieldSerializer} will be set as the
234 * default serializer for types that do not otherwise explicitly specify a serializer.
236 * @param compatible whether versioned serialization is enabled
239 public Builder setCompatible(boolean compatible) {
240 this.compatible = compatible;
245 * Sets the registrationRequired flag.
247 * @param registrationRequired Kryo's registrationRequired flag
249 * @see Kryo#setRegistrationRequired(boolean)
251 public Builder setRegistrationRequired(boolean registrationRequired) {
252 this.registrationRequired = registrationRequired;
258 * Creates a new {@link Namespace} builder.
262 public static Builder builder() {
263 return new Builder();
266 @SuppressWarnings("unchecked")
267 private static List<RegistrationBlock> buildRegistrationBlocks(NamespaceConfig config) {
268 List<Pair<Class<?>[], Serializer<?>>> types = new ArrayList<>();
269 List<RegistrationBlock> blocks = new ArrayList<>();
270 blocks.addAll(Namespaces.BASIC.registeredBlocks);
271 for (NamespaceTypeConfig type : config.getTypes()) {
273 if (type.getId() == null) {
274 types.add(Pair.of(new Class[]{type.getType()}, type.getSerializer() != null ? type.getSerializer().newInstance() : null));
276 blocks.add(new RegistrationBlock(type.getId(), Collections.singletonList(Pair.of(new Class[]{type.getType()}, type.getSerializer().newInstance()))));
278 } catch (InstantiationException | IllegalAccessException e) {
279 throw new ConfigurationException("Failed to instantiate serializer from configuration", e);
282 blocks.add(new RegistrationBlock(FLOATING_ID, types));
286 public Namespace(NamespaceConfig config) {
287 this(buildRegistrationBlocks(config), Thread.currentThread().getContextClassLoader(), config.isRegistrationRequired(), config.isCompatible(), config.getName());
291 * Creates a Kryo instance pool.
293 * @param registeredTypes types to register
294 * @param registrationRequired whether registration is required
295 * @param compatible whether compatible serialization is enabled
296 * @param friendlyName friendly name for the namespace
299 final List<RegistrationBlock> registeredTypes,
300 ClassLoader classLoader,
301 boolean registrationRequired,
303 String friendlyName) {
304 this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
305 this.registrationRequired = registrationRequired;
306 this.classLoader = classLoader;
307 this.compatible = compatible;
308 this.friendlyName = checkNotNull(friendlyName);
312 * Populates the Kryo pool.
314 * @param instances to add to the pool
317 public Namespace populate(int instances) {
319 for (int i = 0; i < instances; ++i) {
326 * Serializes given object to byte array using Kryo instance in pool.
328 * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
330 * @param obj Object to serialize
331 * @return serialized bytes
333 public byte[] serialize(final Object obj) {
334 return serialize(obj, DEFAULT_BUFFER_SIZE);
338 * Serializes given object to byte array using Kryo instance in pool.
340 * @param obj Object to serialize
341 * @param bufferSize maximum size of serialized bytes
342 * @return serialized bytes
344 public byte[] serialize(final Object obj, final int bufferSize) {
345 return kryoOutputPool.run(output -> {
346 return kryoPool.run(kryo -> {
347 kryo.writeClassAndObject(output, obj);
349 return output.getByteArrayOutputStream().toByteArray();
355 * Serializes given object to byte buffer using Kryo instance in pool.
357 * @param obj Object to serialize
358 * @param buffer to write to
360 public void serialize(final Object obj, final ByteBuffer buffer) {
361 ByteBufferOutput out = new ByteBufferOutput(buffer);
362 Kryo kryo = borrow();
364 kryo.writeClassAndObject(out, obj);
372 * Serializes given object to OutputStream using Kryo instance in pool.
374 * @param obj Object to serialize
375 * @param stream to write to
377 public void serialize(final Object obj, final OutputStream stream) {
378 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
382 * Serializes given object to OutputStream using Kryo instance in pool.
384 * @param obj Object to serialize
385 * @param stream to write to
386 * @param bufferSize size of the buffer in front of the stream
388 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
389 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
390 Kryo kryo = borrow();
392 kryo.writeClassAndObject(out, obj);
400 * Deserializes given byte array to Object using Kryo instance in pool.
402 * @param bytes serialized bytes
403 * @param <T> deserialized Object type
404 * @return deserialized Object
406 public <T> T deserialize(final byte[] bytes) {
407 return kryoInputPool.run(input -> {
408 input.setInputStream(new ByteArrayInputStream(bytes));
409 return kryoPool.run(kryo -> {
410 @SuppressWarnings("unchecked")
411 T obj = (T) kryo.readClassAndObject(input);
414 }, DEFAULT_BUFFER_SIZE);
418 * Deserializes given byte buffer to Object using Kryo instance in pool.
420 * @param buffer input with serialized bytes
421 * @param <T> deserialized Object type
422 * @return deserialized Object
424 public <T> T deserialize(final ByteBuffer buffer) {
425 ByteBufferInput in = new ByteBufferInput(buffer);
426 Kryo kryo = borrow();
428 @SuppressWarnings("unchecked")
429 T obj = (T) kryo.readClassAndObject(in);
437 * Deserializes given InputStream to an Object using Kryo instance in pool.
439 * @param stream input stream
440 * @param <T> deserialized Object type
441 * @return deserialized Object
443 public <T> T deserialize(final InputStream stream) {
444 return deserialize(stream, DEFAULT_BUFFER_SIZE);
448 * Deserializes given InputStream to an Object using Kryo instance in pool.
450 * @param stream input stream
451 * @param <T> deserialized Object type
452 * @param bufferSize size of the buffer in front of the stream
453 * @return deserialized Object
455 public <T> T deserialize(final InputStream stream, final int bufferSize) {
456 ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
457 Kryo kryo = borrow();
459 @SuppressWarnings("unchecked")
460 T obj = (T) kryo.readClassAndObject(in);
467 private String friendlyName() {
472 * Gets the number of classes registered in this Kryo namespace.
474 * @return size of namespace
477 return (int) registeredBlocks.stream()
478 .flatMap(block -> block.types().stream())
483 * Creates a Kryo instance.
485 * @return Kryo instance
488 public Kryo create() {
489 LOGGER.trace("Creating Kryo instance for {}", this);
490 Kryo kryo = new Kryo();
491 kryo.setClassLoader(classLoader);
492 kryo.setRegistrationRequired(registrationRequired);
494 // If compatible serialization is enabled, override the default serializer.
496 kryo.setDefaultSerializer(CompatibleFieldSerializer::new);
499 // TODO rethink whether we want to use StdInstantiatorStrategy
500 kryo.setInstantiatorStrategy(
501 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
503 for (RegistrationBlock block : registeredBlocks) {
504 int id = block.begin();
505 if (id == FLOATING_ID) {
506 id = kryo.getNextRegistrationId();
508 for (Pair<Class<?>[], Serializer<?>> entry : block.types()) {
509 register(kryo, entry.getLeft(), entry.getRight(), id++);
516 * Register {@code type} and {@code serializer} to {@code kryo} instance.
518 * @param kryo Kryo instance
519 * @param types types to register
520 * @param serializer Specific serializer to register or null to use default.
521 * @param id type registration id to use
523 private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
524 Registration existing = kryo.getRegistration(id);
525 if (existing != null) {
526 boolean matches = false;
527 for (Class<?> type : types) {
528 if (existing.getType() == type) {
535 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
536 friendlyName(), types, id, existing.getType());
538 throw new IllegalStateException(String.format(
539 "Failed to register %s as %s, %s was already registered.",
540 Arrays.toString(types), id, existing.getType()));
542 // falling through to register call for now.
543 // Consider skipping, if there's reasonable
544 // way to compare serializer equivalence.
547 for (Class<?> type : types) {
548 Registration r = null;
549 if (serializer == null) {
550 r = kryo.register(type, id);
551 } else if (type.isInterface()) {
552 kryo.addDefaultSerializer(type, serializer);
554 r = kryo.register(type, serializer, id);
557 if (r.getId() != id) {
558 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
559 friendlyName(), r.getType(), r.getId(), id);
561 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
567 public Kryo borrow() {
568 return kryoPool.borrow();
572 public void release(Kryo kryo) {
573 kryoPool.release(kryo);
577 public <T> T run(KryoCallback<T> callback) {
578 return kryoPool.run(callback);
582 public String toString() {
583 if (!NO_NAME.equals(friendlyName)) {
584 return MoreObjects.toStringHelper(getClass())
586 .add("friendlyName", friendlyName)
587 // omit lengthy detail, when there's a name
590 return MoreObjects.toStringHelper(getClass())
591 .add("registeredBlocks", registeredBlocks)
595 static final class RegistrationBlock {
596 private final int begin;
597 private final ImmutableList<Pair<Class<?>[], Serializer<?>>> types;
599 RegistrationBlock(int begin, List<Pair<Class<?>[], Serializer<?>>> types) {
601 this.types = ImmutableList.copyOf(types);
608 public ImmutableList<Pair<Class<?>[], Serializer<?>>> types() {
613 public String toString() {
614 return MoreObjects.toStringHelper(getClass())
621 public int hashCode() {
622 return types.hashCode();
625 // Only the registered types are used for equality.
627 public boolean equals(Object obj) {
632 if (obj instanceof RegistrationBlock) {
633 RegistrationBlock that = (RegistrationBlock) obj;
634 return Objects.equals(this.types, that.types);