Do not fall back to default Kryo serializers
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.serializer;
17
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
27 import com.google.common.base.MoreObjects;
28 import com.google.common.collect.ImmutableList;
29 import org.apache.commons.lang3.tuple.Pair;
30 import org.objenesis.strategy.StdInstantiatorStrategy;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import java.io.ByteArrayInputStream;
35 import java.io.InputStream;
36 import java.io.OutputStream;
37 import java.nio.ByteBuffer;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.List;
41 import java.util.Objects;
42
43 import static java.util.Objects.requireNonNull;
44
45 /**
46  * Pool of Kryo instances, with classes pre-registered.
47  */
48 //@ThreadSafe
49 public final class Namespace implements KryoFactory, KryoPool {
50
51   /**
52    * Default buffer size used for serialization.
53    *
54    * @see #serialize(Object)
55    */
56   public static final int DEFAULT_BUFFER_SIZE = 4096;
57
58   /**
59    * Maximum allowed buffer size.
60    */
61   public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
62
63   /**
64    * ID to use if this KryoNamespace does not define registration id.
65    */
66   public static final int FLOATING_ID = -1;
67
68   /**
69    * Smallest ID free to use for user defined registrations.
70    */
71   public static final int INITIAL_ID = 16;
72
73   static final String NO_NAME = "(no name)";
74
75   private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
76
77   /**
78    * Default Kryo namespace.
79    */
80   public static final Namespace DEFAULT = builder().build();
81
82   private final KryoPool kryoPool = new KryoPool.Builder(this)
83       .softReferences()
84       .build();
85
86   private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
87   private final KryoInputPool kryoInputPool = new KryoInputPool();
88
89   private final ImmutableList<RegistrationBlock> registeredBlocks;
90
91   private final ClassLoader classLoader;
92   private final boolean compatible;
93   private final boolean registrationRequired;
94   private final String friendlyName;
95
96   /**
97    * KryoNamespace builder.
98    */
99   //@NotThreadSafe
100   public static final class Builder {
101     private int blockHeadId = INITIAL_ID;
102     private List<Pair<Class<?>[], Serializer<?>>> types = new ArrayList<>();
103     private List<RegistrationBlock> blocks = new ArrayList<>();
104     private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
105     private boolean registrationRequired = true;
106     private boolean compatible = false;
107
108     /**
109      * Builds a {@link Namespace} instance.
110      *
111      * @return KryoNamespace
112      */
113     public Namespace build() {
114       return build(NO_NAME);
115     }
116
117     /**
118      * Builds a {@link Namespace} instance.
119      *
120      * @param friendlyName friendly name for the namespace
121      * @return KryoNamespace
122      */
123     public Namespace build(String friendlyName) {
124       if (!types.isEmpty()) {
125         blocks.add(new RegistrationBlock(this.blockHeadId, types));
126       }
127       return new Namespace(blocks, classLoader, registrationRequired, compatible, friendlyName).populate(1);
128     }
129
130     /**
131      * Sets the next Kryo registration Id for following register entries.
132      *
133      * @param id Kryo registration Id
134      * @return this
135      * @see Kryo#register(Class, Serializer, int)
136      */
137     public Builder nextId(final int id) {
138       if (!types.isEmpty()) {
139         if (id != FLOATING_ID && id < blockHeadId + types.size()) {
140
141           if (LOGGER.isWarnEnabled()) {
142             LOGGER.warn("requested nextId {} could potentially overlap "
143                     + "with existing registrations {}+{} ",
144                 id, blockHeadId, types.size(), new RuntimeException());
145           }
146         }
147         blocks.add(new RegistrationBlock(this.blockHeadId, types));
148         types = new ArrayList<>();
149       }
150       this.blockHeadId = id;
151       return this;
152     }
153
154     /**
155      * Registers serializer for the given set of classes.
156      * <p>
157      * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
158      * all instances will be serialized with the same type ID.
159      *
160      * @param classes    list of classes to register
161      * @param serializer serializer to use for the class
162      * @return this
163      */
164     public Builder register(Serializer<?> serializer, final Class<?>... classes) {
165       types.add(Pair.of(classes, requireNonNull(serializer)));
166       return this;
167     }
168
169     /**
170      * Sets the namespace class loader.
171      *
172      * @param classLoader the namespace class loader
173      * @return the namespace builder
174      */
175     public Builder setClassLoader(ClassLoader classLoader) {
176       this.classLoader = classLoader;
177       return this;
178     }
179
180     /**
181      * Sets whether backwards/forwards compatible versioned serialization is enabled.
182      * <p>
183      * When compatible serialization is enabled, the {@link CompatibleFieldSerializer} will be set as the
184      * default serializer for types that do not otherwise explicitly specify a serializer.
185      *
186      * @param compatible whether versioned serialization is enabled
187      * @return this
188      */
189     public Builder setCompatible(boolean compatible) {
190       this.compatible = compatible;
191       return this;
192     }
193
194     /**
195      * Sets the registrationRequired flag.
196      *
197      * @param registrationRequired Kryo's registrationRequired flag
198      * @return this
199      * @see Kryo#setRegistrationRequired(boolean)
200      */
201     public Builder setRegistrationRequired(boolean registrationRequired) {
202       this.registrationRequired = registrationRequired;
203       return this;
204     }
205   }
206
207   /**
208    * Creates a new {@link Namespace} builder.
209    *
210    * @return builder
211    */
212   public static Builder builder() {
213     return new Builder();
214   }
215
216   /**
217    * Creates a Kryo instance pool.
218    *
219    * @param registeredTypes      types to register
220    * @param registrationRequired whether registration is required
221    * @param compatible           whether compatible serialization is enabled
222    * @param friendlyName         friendly name for the namespace
223    */
224   private Namespace(
225       final List<RegistrationBlock> registeredTypes,
226       ClassLoader classLoader,
227       boolean registrationRequired,
228       boolean compatible,
229       String friendlyName) {
230     this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
231     this.registrationRequired = registrationRequired;
232     this.classLoader = classLoader;
233     this.compatible = compatible;
234     this.friendlyName = requireNonNull(friendlyName);
235   }
236
237   /**
238    * Populates the Kryo pool.
239    *
240    * @param instances to add to the pool
241    * @return this
242    */
243   public Namespace populate(int instances) {
244
245     for (int i = 0; i < instances; ++i) {
246       release(create());
247     }
248     return this;
249   }
250
251   /**
252    * Serializes given object to byte array using Kryo instance in pool.
253    * <p>
254    * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
255    *
256    * @param obj Object to serialize
257    * @return serialized bytes
258    */
259   public byte[] serialize(final Object obj) {
260     return serialize(obj, DEFAULT_BUFFER_SIZE);
261   }
262
263   /**
264    * Serializes given object to byte array using Kryo instance in pool.
265    *
266    * @param obj        Object to serialize
267    * @param bufferSize maximum size of serialized bytes
268    * @return serialized bytes
269    */
270   public byte[] serialize(final Object obj, final int bufferSize) {
271     return kryoOutputPool.run(output -> {
272       return kryoPool.run(kryo -> {
273         kryo.writeClassAndObject(output, obj);
274         output.flush();
275         return output.getByteArrayOutputStream().toByteArray();
276       });
277     }, bufferSize);
278   }
279
280   /**
281    * Serializes given object to byte buffer using Kryo instance in pool.
282    *
283    * @param obj    Object to serialize
284    * @param buffer to write to
285    */
286   public void serialize(final Object obj, final ByteBuffer buffer) {
287     ByteBufferOutput out = new ByteBufferOutput(buffer);
288     Kryo kryo = borrow();
289     try {
290       kryo.writeClassAndObject(out, obj);
291       out.flush();
292     } finally {
293       release(kryo);
294     }
295   }
296
297   /**
298    * Serializes given object to OutputStream using Kryo instance in pool.
299    *
300    * @param obj    Object to serialize
301    * @param stream to write to
302    */
303   public void serialize(final Object obj, final OutputStream stream) {
304     serialize(obj, stream, DEFAULT_BUFFER_SIZE);
305   }
306
307   /**
308    * Serializes given object to OutputStream using Kryo instance in pool.
309    *
310    * @param obj        Object to serialize
311    * @param stream     to write to
312    * @param bufferSize size of the buffer in front of the stream
313    */
314   public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
315     ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
316     Kryo kryo = borrow();
317     try {
318       kryo.writeClassAndObject(out, obj);
319       out.flush();
320     } finally {
321       release(kryo);
322     }
323   }
324
325   /**
326    * Deserializes given byte array to Object using Kryo instance in pool.
327    *
328    * @param bytes serialized bytes
329    * @param <T>   deserialized Object type
330    * @return deserialized Object
331    */
332   public <T> T deserialize(final byte[] bytes) {
333     return kryoInputPool.run(input -> {
334       input.setInputStream(new ByteArrayInputStream(bytes));
335       return kryoPool.run(kryo -> {
336         @SuppressWarnings("unchecked")
337         T obj = (T) kryo.readClassAndObject(input);
338         return obj;
339       });
340     }, DEFAULT_BUFFER_SIZE);
341   }
342
343   /**
344    * Deserializes given byte buffer to Object using Kryo instance in pool.
345    *
346    * @param buffer input with serialized bytes
347    * @param <T>    deserialized Object type
348    * @return deserialized Object
349    */
350   public <T> T deserialize(final ByteBuffer buffer) {
351     ByteBufferInput in = new ByteBufferInput(buffer);
352     Kryo kryo = borrow();
353     try {
354       @SuppressWarnings("unchecked")
355       T obj = (T) kryo.readClassAndObject(in);
356       return obj;
357     } finally {
358       release(kryo);
359     }
360   }
361
362   /**
363    * Deserializes given InputStream to an Object using Kryo instance in pool.
364    *
365    * @param stream input stream
366    * @param <T>    deserialized Object type
367    * @return deserialized Object
368    */
369   public <T> T deserialize(final InputStream stream) {
370     return deserialize(stream, DEFAULT_BUFFER_SIZE);
371   }
372
373   /**
374    * Deserializes given InputStream to an Object using Kryo instance in pool.
375    *
376    * @param stream     input stream
377    * @param <T>        deserialized Object type
378    * @param bufferSize size of the buffer in front of the stream
379    * @return deserialized Object
380    */
381   public <T> T deserialize(final InputStream stream, final int bufferSize) {
382     ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
383     Kryo kryo = borrow();
384     try {
385       @SuppressWarnings("unchecked")
386       T obj = (T) kryo.readClassAndObject(in);
387       return obj;
388     } finally {
389       release(kryo);
390     }
391   }
392
393   private String friendlyName() {
394     return friendlyName;
395   }
396
397   /**
398    * Gets the number of classes registered in this Kryo namespace.
399    *
400    * @return size of namespace
401    */
402   public int size() {
403     return (int) registeredBlocks.stream()
404         .flatMap(block -> block.types().stream())
405         .count();
406   }
407
408   /**
409    * Creates a Kryo instance.
410    *
411    * @return Kryo instance
412    */
413   @Override
414   public Kryo create() {
415     LOGGER.trace("Creating Kryo instance for {}", this);
416     Kryo kryo = new Kryo();
417     kryo.setClassLoader(classLoader);
418     kryo.setRegistrationRequired(registrationRequired);
419
420     // If compatible serialization is enabled, override the default serializer.
421     if (compatible) {
422       kryo.setDefaultSerializer(CompatibleFieldSerializer::new);
423     }
424
425     // TODO rethink whether we want to use StdInstantiatorStrategy
426     kryo.setInstantiatorStrategy(
427         new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
428
429     for (RegistrationBlock block : registeredBlocks) {
430       int id = block.begin();
431       if (id == FLOATING_ID) {
432         id = kryo.getNextRegistrationId();
433       }
434       for (Pair<Class<?>[], Serializer<?>> entry : block.types()) {
435         register(kryo, entry.getLeft(), entry.getRight(), id++);
436       }
437     }
438     return kryo;
439   }
440
441   /**
442    * Register {@code type} and {@code serializer} to {@code kryo} instance.
443    *
444    * @param kryo       Kryo instance
445    * @param types      types to register
446    * @param serializer Specific serializer to register or null to use default.
447    * @param id         type registration id to use
448    */
449   private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
450     Registration existing = kryo.getRegistration(id);
451     if (existing != null) {
452       boolean matches = false;
453       for (Class<?> type : types) {
454         if (existing.getType() == type) {
455           matches = true;
456           break;
457         }
458       }
459
460       if (!matches) {
461         LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
462             friendlyName(), types, id, existing.getType());
463
464         throw new IllegalStateException(String.format(
465             "Failed to register %s as %s, %s was already registered.",
466             Arrays.toString(types), id, existing.getType()));
467       }
468       // falling through to register call for now.
469       // Consider skipping, if there's reasonable
470       // way to compare serializer equivalence.
471     }
472
473     for (Class<?> type : types) {
474       Registration r = null;
475       if (serializer == null) {
476         r = kryo.register(type, id);
477       } else if (type.isInterface()) {
478         kryo.addDefaultSerializer(type, serializer);
479       } else {
480         r = kryo.register(type, serializer, id);
481       }
482       if (r != null) {
483         if (r.getId() != id) {
484           LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
485               friendlyName(), r.getType(), r.getId(), id);
486         }
487         LOGGER.trace("{} registered as {}", r.getType(), r.getId());
488       }
489     }
490   }
491
492   @Override
493   public Kryo borrow() {
494     return kryoPool.borrow();
495   }
496
497   @Override
498   public void release(Kryo kryo) {
499     kryoPool.release(kryo);
500   }
501
502   @Override
503   public <T> T run(KryoCallback<T> callback) {
504     return kryoPool.run(callback);
505   }
506
507   @Override
508   public String toString() {
509     if (!NO_NAME.equals(friendlyName)) {
510       return MoreObjects.toStringHelper(getClass())
511           .omitNullValues()
512           .add("friendlyName", friendlyName)
513           // omit lengthy detail, when there's a name
514           .toString();
515     }
516     return MoreObjects.toStringHelper(getClass())
517         .add("registeredBlocks", registeredBlocks)
518         .toString();
519   }
520
521   static final class RegistrationBlock {
522     private final int begin;
523     private final ImmutableList<Pair<Class<?>[], Serializer<?>>> types;
524
525     RegistrationBlock(int begin, List<Pair<Class<?>[], Serializer<?>>> types) {
526       this.begin = begin;
527       this.types = ImmutableList.copyOf(types);
528     }
529
530     public int begin() {
531       return begin;
532     }
533
534     public ImmutableList<Pair<Class<?>[], Serializer<?>>> types() {
535       return types;
536     }
537
538     @Override
539     public String toString() {
540       return MoreObjects.toStringHelper(getClass())
541           .add("begin", begin)
542           .add("types", types)
543           .toString();
544     }
545
546     @Override
547     public int hashCode() {
548       return types.hashCode();
549     }
550
551     // Only the registered types are used for equality.
552     @Override
553     public boolean equals(Object obj) {
554       if (this == obj) {
555         return true;
556       }
557
558       if (obj instanceof RegistrationBlock) {
559         RegistrationBlock that = (RegistrationBlock) obj;
560         return Objects.equals(this.types, that.types);
561       }
562       return false;
563     }
564   }
565 }