2 * Copyright 2014-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.serializer;
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
27 import com.google.common.base.MoreObjects;
28 import com.google.common.collect.ImmutableList;
29 import org.objenesis.strategy.StdInstantiatorStrategy;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import java.io.ByteArrayInputStream;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.List;
41 import java.util.Map.Entry;
42 import java.util.Objects;
44 import static java.util.Objects.requireNonNull;
47 * Pool of Kryo instances, with classes pre-registered.
50 public final class Namespace implements KryoFactory, KryoPool {
53 * Default buffer size used for serialization.
55 * @see #serialize(Object)
57 public static final int DEFAULT_BUFFER_SIZE = 4096;
60 * Maximum allowed buffer size.
62 public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
65 * ID to use if this KryoNamespace does not define registration id.
67 private static final int FLOATING_ID = -1;
70 * Smallest ID free to use for user defined registrations.
72 private static final int INITIAL_ID = 16;
74 static final String NO_NAME = "(no name)";
76 private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
79 * Default Kryo namespace.
81 public static final Namespace DEFAULT = builder().build();
83 private final KryoPool kryoPool = new KryoPool.Builder(this)
87 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
88 private final KryoInputPool kryoInputPool = new KryoInputPool();
90 private final ImmutableList<RegistrationBlock> registeredBlocks;
92 private final ClassLoader classLoader;
93 private final boolean compatible;
94 private final boolean registrationRequired;
95 private final String friendlyName;
98 * KryoNamespace builder.
101 public static final class Builder {
102 private int blockHeadId = INITIAL_ID;
103 private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
104 private List<RegistrationBlock> blocks = new ArrayList<>();
105 private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
106 private boolean registrationRequired = true;
107 private boolean compatible = false;
110 * Builds a {@link Namespace} instance.
112 * @return KryoNamespace
114 public Namespace build() {
115 return build(NO_NAME);
119 * Builds a {@link Namespace} instance.
121 * @param friendlyName friendly name for the namespace
122 * @return KryoNamespace
124 public Namespace build(String friendlyName) {
125 if (!types.isEmpty()) {
126 blocks.add(new RegistrationBlock(this.blockHeadId, types));
128 return new Namespace(blocks, classLoader, registrationRequired, compatible, friendlyName).populate(1);
132 * Registers serializer for the given set of classes.
134 * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
135 * all instances will be serialized with the same type ID.
137 * @param classes list of classes to register
138 * @param serializer serializer to use for the class
141 public Builder register(Serializer<?> serializer, final Class<?>... classes) {
142 types.add(Map.entry(classes, serializer));
147 * Sets the namespace class loader.
149 * @param classLoader the namespace class loader
150 * @return the namespace builder
152 public Builder setClassLoader(ClassLoader classLoader) {
153 this.classLoader = classLoader;
158 * Sets whether backwards/forwards compatible versioned serialization is enabled.
160 * When compatible serialization is enabled, the {@link CompatibleFieldSerializer} will be set as the
161 * default serializer for types that do not otherwise explicitly specify a serializer.
163 * @param compatible whether versioned serialization is enabled
166 public Builder setCompatible(boolean compatible) {
167 this.compatible = compatible;
172 * Sets the registrationRequired flag.
174 * @param registrationRequired Kryo's registrationRequired flag
176 * @see Kryo#setRegistrationRequired(boolean)
178 public Builder setRegistrationRequired(boolean registrationRequired) {
179 this.registrationRequired = registrationRequired;
185 * Creates a new {@link Namespace} builder.
189 public static Builder builder() {
190 return new Builder();
194 * Creates a Kryo instance pool.
196 * @param registeredTypes types to register
197 * @param registrationRequired whether registration is required
198 * @param compatible whether compatible serialization is enabled
199 * @param friendlyName friendly name for the namespace
202 final List<RegistrationBlock> registeredTypes,
203 ClassLoader classLoader,
204 boolean registrationRequired,
206 String friendlyName) {
207 this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
208 this.registrationRequired = registrationRequired;
209 this.classLoader = classLoader;
210 this.compatible = compatible;
211 this.friendlyName = requireNonNull(friendlyName);
215 * Populates the Kryo pool.
217 * @param instances to add to the pool
220 public Namespace populate(int instances) {
222 for (int i = 0; i < instances; ++i) {
229 * Serializes given object to byte array using Kryo instance in pool.
231 * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
233 * @param obj Object to serialize
234 * @return serialized bytes
236 public byte[] serialize(final Object obj) {
237 return serialize(obj, DEFAULT_BUFFER_SIZE);
241 * Serializes given object to byte array using Kryo instance in pool.
243 * @param obj Object to serialize
244 * @param bufferSize maximum size of serialized bytes
245 * @return serialized bytes
247 public byte[] serialize(final Object obj, final int bufferSize) {
248 return kryoOutputPool.run(output -> {
249 return kryoPool.run(kryo -> {
250 kryo.writeClassAndObject(output, obj);
252 return output.getByteArrayOutputStream().toByteArray();
258 * Serializes given object to byte buffer using Kryo instance in pool.
260 * @param obj Object to serialize
261 * @param buffer to write to
263 public void serialize(final Object obj, final ByteBuffer buffer) {
264 ByteBufferOutput out = new ByteBufferOutput(buffer);
265 Kryo kryo = borrow();
267 kryo.writeClassAndObject(out, obj);
275 * Serializes given object to OutputStream using Kryo instance in pool.
277 * @param obj Object to serialize
278 * @param stream to write to
280 public void serialize(final Object obj, final OutputStream stream) {
281 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
285 * Serializes given object to OutputStream using Kryo instance in pool.
287 * @param obj Object to serialize
288 * @param stream to write to
289 * @param bufferSize size of the buffer in front of the stream
291 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
292 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
293 Kryo kryo = borrow();
295 kryo.writeClassAndObject(out, obj);
303 * Deserializes given byte array to Object using Kryo instance in pool.
305 * @param bytes serialized bytes
306 * @param <T> deserialized Object type
307 * @return deserialized Object
309 public <T> T deserialize(final byte[] bytes) {
310 return kryoInputPool.run(input -> {
311 input.setInputStream(new ByteArrayInputStream(bytes));
312 return kryoPool.run(kryo -> {
313 @SuppressWarnings("unchecked")
314 T obj = (T) kryo.readClassAndObject(input);
317 }, DEFAULT_BUFFER_SIZE);
321 * Deserializes given byte buffer to Object using Kryo instance in pool.
323 * @param buffer input with serialized bytes
324 * @param <T> deserialized Object type
325 * @return deserialized Object
327 public <T> T deserialize(final ByteBuffer buffer) {
328 ByteBufferInput in = new ByteBufferInput(buffer);
329 Kryo kryo = borrow();
331 @SuppressWarnings("unchecked")
332 T obj = (T) kryo.readClassAndObject(in);
340 * Deserializes given InputStream to an Object using Kryo instance in pool.
342 * @param stream input stream
343 * @param <T> deserialized Object type
344 * @return deserialized Object
346 public <T> T deserialize(final InputStream stream) {
347 return deserialize(stream, DEFAULT_BUFFER_SIZE);
351 * Deserializes given InputStream to an Object using Kryo instance in pool.
353 * @param stream input stream
354 * @param <T> deserialized Object type
355 * @param bufferSize size of the buffer in front of the stream
356 * @return deserialized Object
358 public <T> T deserialize(final InputStream stream, final int bufferSize) {
359 ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
360 Kryo kryo = borrow();
362 @SuppressWarnings("unchecked")
363 T obj = (T) kryo.readClassAndObject(in);
370 private String friendlyName() {
375 * Gets the number of classes registered in this Kryo namespace.
377 * @return size of namespace
380 return (int) registeredBlocks.stream()
381 .flatMap(block -> block.types().stream())
386 * Creates a Kryo instance.
388 * @return Kryo instance
391 public Kryo create() {
392 LOGGER.trace("Creating Kryo instance for {}", this);
393 Kryo kryo = new Kryo();
394 kryo.setClassLoader(classLoader);
395 kryo.setRegistrationRequired(registrationRequired);
397 // If compatible serialization is enabled, override the default serializer.
399 kryo.setDefaultSerializer(CompatibleFieldSerializer::new);
402 // TODO rethink whether we want to use StdInstantiatorStrategy
403 kryo.setInstantiatorStrategy(
404 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
406 for (RegistrationBlock block : registeredBlocks) {
407 int id = block.begin();
408 if (id == FLOATING_ID) {
409 id = kryo.getNextRegistrationId();
411 for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
412 register(kryo, entry.getKey(), entry.getValue(), id++);
419 * Register {@code type} and {@code serializer} to {@code kryo} instance.
421 * @param kryo Kryo instance
422 * @param types types to register
423 * @param serializer Specific serializer to register or null to use default.
424 * @param id type registration id to use
426 private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
427 Registration existing = kryo.getRegistration(id);
428 if (existing != null) {
429 boolean matches = false;
430 for (Class<?> type : types) {
431 if (existing.getType() == type) {
438 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
439 friendlyName(), types, id, existing.getType());
441 throw new IllegalStateException(String.format(
442 "Failed to register %s as %s, %s was already registered.",
443 Arrays.toString(types), id, existing.getType()));
445 // falling through to register call for now.
446 // Consider skipping, if there's reasonable
447 // way to compare serializer equivalence.
450 for (Class<?> type : types) {
451 Registration r = null;
452 if (serializer == null) {
453 r = kryo.register(type, id);
454 } else if (type.isInterface()) {
455 kryo.addDefaultSerializer(type, serializer);
457 r = kryo.register(type, serializer, id);
460 if (r.getId() != id) {
461 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
462 friendlyName(), r.getType(), r.getId(), id);
464 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
470 public Kryo borrow() {
471 return kryoPool.borrow();
475 public void release(Kryo kryo) {
476 kryoPool.release(kryo);
480 public <T> T run(KryoCallback<T> callback) {
481 return kryoPool.run(callback);
485 public String toString() {
486 if (!NO_NAME.equals(friendlyName)) {
487 return MoreObjects.toStringHelper(getClass())
489 .add("friendlyName", friendlyName)
490 // omit lengthy detail, when there's a name
493 return MoreObjects.toStringHelper(getClass())
494 .add("registeredBlocks", registeredBlocks)
498 static final class RegistrationBlock {
499 private final int begin;
500 private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
502 RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
504 this.types = ImmutableList.copyOf(types);
511 public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
516 public String toString() {
517 return MoreObjects.toStringHelper(getClass())
524 public int hashCode() {
525 return types.hashCode();
528 // Only the registered types are used for equality.
530 public boolean equals(Object obj) {
535 if (obj instanceof RegistrationBlock) {
536 RegistrationBlock that = (RegistrationBlock) obj;
537 return Objects.equals(this.types, that.types);