2 * Copyright 2014-present Open Networking Foundation
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package io.atomix.utils.serializer;
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.google.common.base.MoreObjects;
27 import com.google.common.collect.ImmutableList;
28 import org.objenesis.strategy.StdInstantiatorStrategy;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
40 import java.util.Map.Entry;
41 import java.util.Objects;
43 import static java.util.Objects.requireNonNull;
46 * Pool of Kryo instances, with classes pre-registered.
49 public final class Namespace implements KryoFactory, KryoPool {
52 * Default buffer size used for serialization.
54 * @see #serialize(Object)
56 public static final int DEFAULT_BUFFER_SIZE = 4096;
59 * Maximum allowed buffer size.
61 public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
64 * ID to use if this KryoNamespace does not define registration id.
66 private static final int FLOATING_ID = -1;
69 * Smallest ID free to use for user defined registrations.
71 private static final int INITIAL_ID = 16;
73 static final String NO_NAME = "(no name)";
75 private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
78 * Default Kryo namespace.
80 public static final Namespace DEFAULT = builder().build();
82 private final KryoPool kryoPool = new KryoPool.Builder(this)
86 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
87 private final KryoInputPool kryoInputPool = new KryoInputPool();
89 private final ImmutableList<RegistrationBlock> registeredBlocks;
91 private final ClassLoader classLoader;
92 private final boolean registrationRequired;
93 private final String friendlyName;
96 * KryoNamespace builder.
99 public static final class Builder {
100 private int blockHeadId = INITIAL_ID;
101 private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
102 private List<RegistrationBlock> blocks = new ArrayList<>();
103 private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
104 private boolean registrationRequired = true;
107 * Builds a {@link Namespace} instance.
109 * @return KryoNamespace
111 public Namespace build() {
112 return build(NO_NAME);
116 * Builds a {@link Namespace} instance.
118 * @param friendlyName friendly name for the namespace
119 * @return KryoNamespace
121 public Namespace build(String friendlyName) {
122 if (!types.isEmpty()) {
123 blocks.add(new RegistrationBlock(this.blockHeadId, types));
125 return new Namespace(blocks, classLoader, registrationRequired, friendlyName).populate(1);
129 * Registers serializer for the given set of classes.
131 * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
132 * all instances will be serialized with the same type ID.
134 * @param classes list of classes to register
135 * @param serializer serializer to use for the class
138 public Builder register(Serializer<?> serializer, final Class<?>... classes) {
139 types.add(Map.entry(classes, serializer));
144 * Sets the namespace class loader.
146 * @param classLoader the namespace class loader
147 * @return the namespace builder
149 public Builder setClassLoader(ClassLoader classLoader) {
150 this.classLoader = classLoader;
155 * Sets the registrationRequired flag.
157 * @param registrationRequired Kryo's registrationRequired flag
159 * @see Kryo#setRegistrationRequired(boolean)
161 public Builder setRegistrationRequired(boolean registrationRequired) {
162 this.registrationRequired = registrationRequired;
168 * Creates a new {@link Namespace} builder.
172 public static Builder builder() {
173 return new Builder();
177 * Creates a Kryo instance pool.
179 * @param registeredTypes types to register
180 * @param registrationRequired whether registration is required
181 * @param friendlyName friendly name for the namespace
184 final List<RegistrationBlock> registeredTypes,
185 ClassLoader classLoader,
186 boolean registrationRequired,
187 String friendlyName) {
188 this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
189 this.registrationRequired = registrationRequired;
190 this.classLoader = classLoader;
191 this.friendlyName = requireNonNull(friendlyName);
195 * Populates the Kryo pool.
197 * @param instances to add to the pool
200 public Namespace populate(int instances) {
202 for (int i = 0; i < instances; ++i) {
209 * Serializes given object to byte array using Kryo instance in pool.
211 * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
213 * @param obj Object to serialize
214 * @return serialized bytes
216 public byte[] serialize(final Object obj) {
217 return serialize(obj, DEFAULT_BUFFER_SIZE);
221 * Serializes given object to byte array using Kryo instance in pool.
223 * @param obj Object to serialize
224 * @param bufferSize maximum size of serialized bytes
225 * @return serialized bytes
227 public byte[] serialize(final Object obj, final int bufferSize) {
228 return kryoOutputPool.run(output -> {
229 return kryoPool.run(kryo -> {
230 kryo.writeClassAndObject(output, obj);
232 return output.getByteArrayOutputStream().toByteArray();
238 * Serializes given object to byte buffer using Kryo instance in pool.
240 * @param obj Object to serialize
241 * @param buffer to write to
243 public void serialize(final Object obj, final ByteBuffer buffer) {
244 ByteBufferOutput out = new ByteBufferOutput(buffer);
245 Kryo kryo = borrow();
247 kryo.writeClassAndObject(out, obj);
255 * Serializes given object to OutputStream using Kryo instance in pool.
257 * @param obj Object to serialize
258 * @param stream to write to
260 public void serialize(final Object obj, final OutputStream stream) {
261 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
265 * Serializes given object to OutputStream using Kryo instance in pool.
267 * @param obj Object to serialize
268 * @param stream to write to
269 * @param bufferSize size of the buffer in front of the stream
271 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
272 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
273 Kryo kryo = borrow();
275 kryo.writeClassAndObject(out, obj);
283 * Deserializes given byte array to Object using Kryo instance in pool.
285 * @param bytes serialized bytes
286 * @param <T> deserialized Object type
287 * @return deserialized Object
289 public <T> T deserialize(final byte[] bytes) {
290 return kryoInputPool.run(input -> {
291 input.setInputStream(new ByteArrayInputStream(bytes));
292 return kryoPool.run(kryo -> {
293 @SuppressWarnings("unchecked")
294 T obj = (T) kryo.readClassAndObject(input);
297 }, DEFAULT_BUFFER_SIZE);
301 * Deserializes given byte buffer to Object using Kryo instance in pool.
303 * @param buffer input with serialized bytes
304 * @param <T> deserialized Object type
305 * @return deserialized Object
307 public <T> T deserialize(final ByteBuffer buffer) {
308 ByteBufferInput in = new ByteBufferInput(buffer);
309 Kryo kryo = borrow();
311 @SuppressWarnings("unchecked")
312 T obj = (T) kryo.readClassAndObject(in);
320 * Deserializes given InputStream to an Object using Kryo instance in pool.
322 * @param stream input stream
323 * @param <T> deserialized Object type
324 * @return deserialized Object
326 public <T> T deserialize(final InputStream stream) {
327 return deserialize(stream, DEFAULT_BUFFER_SIZE);
331 * Deserializes given InputStream to an Object using Kryo instance in pool.
333 * @param stream input stream
334 * @param <T> deserialized Object type
335 * @param bufferSize size of the buffer in front of the stream
336 * @return deserialized Object
338 public <T> T deserialize(final InputStream stream, final int bufferSize) {
339 ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
340 Kryo kryo = borrow();
342 @SuppressWarnings("unchecked")
343 T obj = (T) kryo.readClassAndObject(in);
350 private String friendlyName() {
355 * Gets the number of classes registered in this Kryo namespace.
357 * @return size of namespace
360 return (int) registeredBlocks.stream()
361 .flatMap(block -> block.types().stream())
366 * Creates a Kryo instance.
368 * @return Kryo instance
371 public Kryo create() {
372 LOGGER.trace("Creating Kryo instance for {}", this);
373 Kryo kryo = new Kryo();
374 kryo.setClassLoader(classLoader);
375 kryo.setRegistrationRequired(registrationRequired);
377 // TODO rethink whether we want to use StdInstantiatorStrategy
378 kryo.setInstantiatorStrategy(
379 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
381 for (RegistrationBlock block : registeredBlocks) {
382 int id = block.begin();
383 if (id == FLOATING_ID) {
384 id = kryo.getNextRegistrationId();
386 for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
387 register(kryo, entry.getKey(), entry.getValue(), id++);
394 * Register {@code type} and {@code serializer} to {@code kryo} instance.
396 * @param kryo Kryo instance
397 * @param types types to register
398 * @param serializer Specific serializer to register or null to use default.
399 * @param id type registration id to use
401 private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
402 Registration existing = kryo.getRegistration(id);
403 if (existing != null) {
404 boolean matches = false;
405 for (Class<?> type : types) {
406 if (existing.getType() == type) {
413 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
414 friendlyName(), types, id, existing.getType());
416 throw new IllegalStateException(String.format(
417 "Failed to register %s as %s, %s was already registered.",
418 Arrays.toString(types), id, existing.getType()));
420 // falling through to register call for now.
421 // Consider skipping, if there's reasonable
422 // way to compare serializer equivalence.
425 for (Class<?> type : types) {
426 Registration r = null;
427 if (serializer == null) {
428 r = kryo.register(type, id);
429 } else if (type.isInterface()) {
430 kryo.addDefaultSerializer(type, serializer);
432 r = kryo.register(type, serializer, id);
435 if (r.getId() != id) {
436 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
437 friendlyName(), r.getType(), r.getId(), id);
439 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
445 public Kryo borrow() {
446 return kryoPool.borrow();
450 public void release(Kryo kryo) {
451 kryoPool.release(kryo);
455 public <T> T run(KryoCallback<T> callback) {
456 return kryoPool.run(callback);
460 public String toString() {
461 if (!NO_NAME.equals(friendlyName)) {
462 return MoreObjects.toStringHelper(getClass())
464 .add("friendlyName", friendlyName)
465 // omit lengthy detail, when there's a name
468 return MoreObjects.toStringHelper(getClass())
469 .add("registeredBlocks", registeredBlocks)
473 static final class RegistrationBlock {
474 private final int begin;
475 private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
477 RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
479 this.types = ImmutableList.copyOf(types);
486 public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
491 public String toString() {
492 return MoreObjects.toStringHelper(getClass())
499 public int hashCode() {
500 return types.hashCode();
503 // Only the registered types are used for equality.
505 public boolean equals(Object obj) {
510 if (obj instanceof RegistrationBlock) {
511 RegistrationBlock that = (RegistrationBlock) obj;
512 return Objects.equals(this.types, that.types);