2 * Copyright 2014-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.serializer;
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.google.common.base.MoreObjects;
27 import com.google.common.collect.ImmutableList;
28 import org.objenesis.strategy.StdInstantiatorStrategy;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
40 import java.util.Map.Entry;
41 import java.util.Objects;
43 import static java.util.Objects.requireNonNull;
46 * Pool of Kryo instances, with classes pre-registered.
49 public final class Namespace implements KryoFactory, KryoPool {
52 * Default buffer size used for serialization.
54 * @see #serialize(Object)
56 public static final int DEFAULT_BUFFER_SIZE = 4096;
59 * ID to use if this KryoNamespace does not define registration id.
61 private static final int FLOATING_ID = -1;
64 * Smallest ID free to use for user defined registrations.
66 private static final int INITIAL_ID = 16;
68 static final String NO_NAME = "(no name)";
70 private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
73 * Default Kryo namespace.
75 public static final Namespace DEFAULT = builder().build();
77 private final KryoPool kryoPool = new KryoPool.Builder(this)
81 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
82 private final KryoInputPool kryoInputPool = new KryoInputPool();
84 private final ImmutableList<RegistrationBlock> registeredBlocks;
86 private final ClassLoader classLoader;
87 private final boolean registrationRequired;
88 private final String friendlyName;
91 * KryoNamespace builder.
94 public static final class Builder {
95 private int blockHeadId = INITIAL_ID;
96 private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
97 private List<RegistrationBlock> blocks = new ArrayList<>();
98 private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
99 private boolean registrationRequired = true;
102 * Builds a {@link Namespace} instance.
104 * @return KryoNamespace
106 public Namespace build() {
107 return build(NO_NAME);
111 * Builds a {@link Namespace} instance.
113 * @param friendlyName friendly name for the namespace
114 * @return KryoNamespace
116 public Namespace build(String friendlyName) {
117 if (!types.isEmpty()) {
118 blocks.add(new RegistrationBlock(this.blockHeadId, types));
120 return new Namespace(blocks, classLoader, registrationRequired, friendlyName).populate(1);
124 * Registers serializer for the given set of classes.
126 * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
127 * all instances will be serialized with the same type ID.
129 * @param classes list of classes to register
130 * @param serializer serializer to use for the class
133 public Builder register(Serializer<?> serializer, final Class<?>... classes) {
134 types.add(Map.entry(classes, serializer));
139 * Sets the namespace class loader.
141 * @param classLoader the namespace class loader
142 * @return the namespace builder
144 public Builder setClassLoader(ClassLoader classLoader) {
145 this.classLoader = classLoader;
150 * Sets the registrationRequired flag.
152 * @param registrationRequired Kryo's registrationRequired flag
154 * @see Kryo#setRegistrationRequired(boolean)
156 public Builder setRegistrationRequired(boolean registrationRequired) {
157 this.registrationRequired = registrationRequired;
163 * Creates a new {@link Namespace} builder.
167 public static Builder builder() {
168 return new Builder();
172 * Creates a Kryo instance pool.
174 * @param registeredTypes types to register
175 * @param registrationRequired whether registration is required
176 * @param friendlyName friendly name for the namespace
179 final List<RegistrationBlock> registeredTypes,
180 ClassLoader classLoader,
181 boolean registrationRequired,
182 String friendlyName) {
183 this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
184 this.registrationRequired = registrationRequired;
185 this.classLoader = classLoader;
186 this.friendlyName = requireNonNull(friendlyName);
190 * Populates the Kryo pool.
192 * @param instances to add to the pool
195 public Namespace populate(int instances) {
197 for (int i = 0; i < instances; ++i) {
204 * Serializes given object to byte array using Kryo instance in pool.
206 * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
208 * @param obj Object to serialize
209 * @return serialized bytes
211 public byte[] serialize(final Object obj) {
212 return serialize(obj, DEFAULT_BUFFER_SIZE);
216 * Serializes given object to byte array using Kryo instance in pool.
218 * @param obj Object to serialize
219 * @param bufferSize maximum size of serialized bytes
220 * @return serialized bytes
222 public byte[] serialize(final Object obj, final int bufferSize) {
223 return kryoOutputPool.run(output -> {
224 return kryoPool.run(kryo -> {
225 kryo.writeClassAndObject(output, obj);
227 return output.getByteArrayOutputStream().toByteArray();
233 * Serializes given object to byte buffer using Kryo instance in pool.
235 * @param obj Object to serialize
236 * @param buffer to write to
238 public void serialize(final Object obj, final ByteBuffer buffer) {
239 ByteBufferOutput out = new ByteBufferOutput(buffer);
240 Kryo kryo = borrow();
242 kryo.writeClassAndObject(out, obj);
250 * Serializes given object to OutputStream using Kryo instance in pool.
252 * @param obj Object to serialize
253 * @param stream to write to
255 public void serialize(final Object obj, final OutputStream stream) {
256 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
260 * Serializes given object to OutputStream using Kryo instance in pool.
262 * @param obj Object to serialize
263 * @param stream to write to
264 * @param bufferSize size of the buffer in front of the stream
266 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
267 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
268 Kryo kryo = borrow();
270 kryo.writeClassAndObject(out, obj);
278 * Deserializes given byte array to Object using Kryo instance in pool.
280 * @param bytes serialized bytes
281 * @param <T> deserialized Object type
282 * @return deserialized Object
284 public <T> T deserialize(final byte[] bytes) {
285 return kryoInputPool.run(input -> {
286 input.setInputStream(new ByteArrayInputStream(bytes));
287 return kryoPool.run(kryo -> {
288 @SuppressWarnings("unchecked")
289 T obj = (T) kryo.readClassAndObject(input);
292 }, DEFAULT_BUFFER_SIZE);
296 * Deserializes given byte buffer to Object using Kryo instance in pool.
298 * @param buffer input with serialized bytes
299 * @param <T> deserialized Object type
300 * @return deserialized Object
302 public <T> T deserialize(final ByteBuffer buffer) {
303 ByteBufferInput in = new ByteBufferInput(buffer);
304 Kryo kryo = borrow();
306 @SuppressWarnings("unchecked")
307 T obj = (T) kryo.readClassAndObject(in);
315 * Deserializes given InputStream to an Object using Kryo instance in pool.
317 * @param stream input stream
318 * @param <T> deserialized Object type
319 * @return deserialized Object
321 public <T> T deserialize(final InputStream stream) {
322 return deserialize(stream, DEFAULT_BUFFER_SIZE);
326 * Deserializes given InputStream to an Object using Kryo instance in pool.
328 * @param stream input stream
329 * @param <T> deserialized Object type
330 * @param bufferSize size of the buffer in front of the stream
331 * @return deserialized Object
333 public <T> T deserialize(final InputStream stream, final int bufferSize) {
334 ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
335 Kryo kryo = borrow();
337 @SuppressWarnings("unchecked")
338 T obj = (T) kryo.readClassAndObject(in);
345 private String friendlyName() {
350 * Gets the number of classes registered in this Kryo namespace.
352 * @return size of namespace
355 return (int) registeredBlocks.stream()
356 .flatMap(block -> block.types().stream())
361 * Creates a Kryo instance.
363 * @return Kryo instance
366 public Kryo create() {
367 LOGGER.trace("Creating Kryo instance for {}", this);
368 Kryo kryo = new Kryo();
369 kryo.setClassLoader(classLoader);
370 kryo.setRegistrationRequired(registrationRequired);
372 // TODO rethink whether we want to use StdInstantiatorStrategy
373 kryo.setInstantiatorStrategy(
374 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
376 for (RegistrationBlock block : registeredBlocks) {
377 int id = block.begin();
378 if (id == FLOATING_ID) {
379 id = kryo.getNextRegistrationId();
381 for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
382 register(kryo, entry.getKey(), entry.getValue(), id++);
389 * Register {@code type} and {@code serializer} to {@code kryo} instance.
391 * @param kryo Kryo instance
392 * @param types types to register
393 * @param serializer Specific serializer to register or null to use default.
394 * @param id type registration id to use
396 private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
397 Registration existing = kryo.getRegistration(id);
398 if (existing != null) {
399 boolean matches = false;
400 for (Class<?> type : types) {
401 if (existing.getType() == type) {
408 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
409 friendlyName(), types, id, existing.getType());
411 throw new IllegalStateException(String.format(
412 "Failed to register %s as %s, %s was already registered.",
413 Arrays.toString(types), id, existing.getType()));
415 // falling through to register call for now.
416 // Consider skipping, if there's reasonable
417 // way to compare serializer equivalence.
420 for (Class<?> type : types) {
421 Registration r = null;
422 if (serializer == null) {
423 r = kryo.register(type, id);
424 } else if (type.isInterface()) {
425 kryo.addDefaultSerializer(type, serializer);
427 r = kryo.register(type, serializer, id);
430 if (r.getId() != id) {
431 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
432 friendlyName(), r.getType(), r.getId(), id);
434 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
440 public Kryo borrow() {
441 return kryoPool.borrow();
445 public void release(Kryo kryo) {
446 kryoPool.release(kryo);
450 public <T> T run(KryoCallback<T> callback) {
451 return kryoPool.run(callback);
455 public String toString() {
456 if (!NO_NAME.equals(friendlyName)) {
457 return MoreObjects.toStringHelper(getClass())
459 .add("friendlyName", friendlyName)
460 // omit lengthy detail, when there's a name
463 return MoreObjects.toStringHelper(getClass())
464 .add("registeredBlocks", registeredBlocks)
468 static final class RegistrationBlock {
469 private final int begin;
470 private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
472 RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
474 this.types = ImmutableList.copyOf(types);
481 public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
486 public String toString() {
487 return MoreObjects.toStringHelper(getClass())
494 public int hashCode() {
495 return types.hashCode();
498 // Only the registered types are used for equality.
500 public boolean equals(Object obj) {
505 if (obj instanceof RegistrationBlock) {
506 RegistrationBlock that = (RegistrationBlock) obj;
507 return Objects.equals(this.types, that.types);