b9eeaf269c9dad75bb06026c8af7a61474daf6c9
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.serializer;
17
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
27 import com.google.common.base.MoreObjects;
28 import com.google.common.collect.ImmutableList;
29 import org.objenesis.strategy.StdInstantiatorStrategy;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import java.io.ByteArrayInputStream;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Map.Entry;
42 import java.util.Objects;
43
44 import static java.util.Objects.requireNonNull;
45
46 /**
47  * Pool of Kryo instances, with classes pre-registered.
48  */
49 //@ThreadSafe
50 public final class Namespace implements KryoFactory, KryoPool {
51
52   /**
53    * Default buffer size used for serialization.
54    *
55    * @see #serialize(Object)
56    */
57   public static final int DEFAULT_BUFFER_SIZE = 4096;
58
59   /**
60    * Maximum allowed buffer size.
61    */
62   public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
63
64   /**
65    * ID to use if this KryoNamespace does not define registration id.
66    */
67   public static final int FLOATING_ID = -1;
68
69   /**
70    * Smallest ID free to use for user defined registrations.
71    */
72   public static final int INITIAL_ID = 16;
73
74   static final String NO_NAME = "(no name)";
75
76   private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
77
78   /**
79    * Default Kryo namespace.
80    */
81   public static final Namespace DEFAULT = builder().build();
82
83   private final KryoPool kryoPool = new KryoPool.Builder(this)
84       .softReferences()
85       .build();
86
87   private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
88   private final KryoInputPool kryoInputPool = new KryoInputPool();
89
90   private final ImmutableList<RegistrationBlock> registeredBlocks;
91
92   private final ClassLoader classLoader;
93   private final boolean compatible;
94   private final boolean registrationRequired;
95   private final String friendlyName;
96
97   /**
98    * KryoNamespace builder.
99    */
100   //@NotThreadSafe
101   public static final class Builder {
102     private int blockHeadId = INITIAL_ID;
103     private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
104     private List<RegistrationBlock> blocks = new ArrayList<>();
105     private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
106     private boolean registrationRequired = true;
107     private boolean compatible = false;
108
109     /**
110      * Builds a {@link Namespace} instance.
111      *
112      * @return KryoNamespace
113      */
114     public Namespace build() {
115       return build(NO_NAME);
116     }
117
118     /**
119      * Builds a {@link Namespace} instance.
120      *
121      * @param friendlyName friendly name for the namespace
122      * @return KryoNamespace
123      */
124     public Namespace build(String friendlyName) {
125       if (!types.isEmpty()) {
126         blocks.add(new RegistrationBlock(this.blockHeadId, types));
127       }
128       return new Namespace(blocks, classLoader, registrationRequired, compatible, friendlyName).populate(1);
129     }
130
131     /**
132      * Sets the next Kryo registration Id for following register entries.
133      *
134      * @param id Kryo registration Id
135      * @return this
136      * @see Kryo#register(Class, Serializer, int)
137      */
138     public Builder nextId(final int id) {
139       if (!types.isEmpty()) {
140         if (id != FLOATING_ID && id < blockHeadId + types.size()) {
141
142           if (LOGGER.isWarnEnabled()) {
143             LOGGER.warn("requested nextId {} could potentially overlap "
144                     + "with existing registrations {}+{} ",
145                 id, blockHeadId, types.size(), new RuntimeException());
146           }
147         }
148         blocks.add(new RegistrationBlock(this.blockHeadId, types));
149         types = new ArrayList<>();
150       }
151       this.blockHeadId = id;
152       return this;
153     }
154
155     /**
156      * Registers serializer for the given set of classes.
157      * <p>
158      * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
159      * all instances will be serialized with the same type ID.
160      *
161      * @param classes    list of classes to register
162      * @param serializer serializer to use for the class
163      * @return this
164      */
165     public Builder register(Serializer<?> serializer, final Class<?>... classes) {
166       types.add(Map.entry(classes, serializer));
167       return this;
168     }
169
170     /**
171      * Sets the namespace class loader.
172      *
173      * @param classLoader the namespace class loader
174      * @return the namespace builder
175      */
176     public Builder setClassLoader(ClassLoader classLoader) {
177       this.classLoader = classLoader;
178       return this;
179     }
180
181     /**
182      * Sets whether backwards/forwards compatible versioned serialization is enabled.
183      * <p>
184      * When compatible serialization is enabled, the {@link CompatibleFieldSerializer} will be set as the
185      * default serializer for types that do not otherwise explicitly specify a serializer.
186      *
187      * @param compatible whether versioned serialization is enabled
188      * @return this
189      */
190     public Builder setCompatible(boolean compatible) {
191       this.compatible = compatible;
192       return this;
193     }
194
195     /**
196      * Sets the registrationRequired flag.
197      *
198      * @param registrationRequired Kryo's registrationRequired flag
199      * @return this
200      * @see Kryo#setRegistrationRequired(boolean)
201      */
202     public Builder setRegistrationRequired(boolean registrationRequired) {
203       this.registrationRequired = registrationRequired;
204       return this;
205     }
206   }
207
208   /**
209    * Creates a new {@link Namespace} builder.
210    *
211    * @return builder
212    */
213   public static Builder builder() {
214     return new Builder();
215   }
216
217   /**
218    * Creates a Kryo instance pool.
219    *
220    * @param registeredTypes      types to register
221    * @param registrationRequired whether registration is required
222    * @param compatible           whether compatible serialization is enabled
223    * @param friendlyName         friendly name for the namespace
224    */
225   private Namespace(
226       final List<RegistrationBlock> registeredTypes,
227       ClassLoader classLoader,
228       boolean registrationRequired,
229       boolean compatible,
230       String friendlyName) {
231     this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
232     this.registrationRequired = registrationRequired;
233     this.classLoader = classLoader;
234     this.compatible = compatible;
235     this.friendlyName = requireNonNull(friendlyName);
236   }
237
238   /**
239    * Populates the Kryo pool.
240    *
241    * @param instances to add to the pool
242    * @return this
243    */
244   public Namespace populate(int instances) {
245
246     for (int i = 0; i < instances; ++i) {
247       release(create());
248     }
249     return this;
250   }
251
252   /**
253    * Serializes given object to byte array using Kryo instance in pool.
254    * <p>
255    * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
256    *
257    * @param obj Object to serialize
258    * @return serialized bytes
259    */
260   public byte[] serialize(final Object obj) {
261     return serialize(obj, DEFAULT_BUFFER_SIZE);
262   }
263
264   /**
265    * Serializes given object to byte array using Kryo instance in pool.
266    *
267    * @param obj        Object to serialize
268    * @param bufferSize maximum size of serialized bytes
269    * @return serialized bytes
270    */
271   public byte[] serialize(final Object obj, final int bufferSize) {
272     return kryoOutputPool.run(output -> {
273       return kryoPool.run(kryo -> {
274         kryo.writeClassAndObject(output, obj);
275         output.flush();
276         return output.getByteArrayOutputStream().toByteArray();
277       });
278     }, bufferSize);
279   }
280
281   /**
282    * Serializes given object to byte buffer using Kryo instance in pool.
283    *
284    * @param obj    Object to serialize
285    * @param buffer to write to
286    */
287   public void serialize(final Object obj, final ByteBuffer buffer) {
288     ByteBufferOutput out = new ByteBufferOutput(buffer);
289     Kryo kryo = borrow();
290     try {
291       kryo.writeClassAndObject(out, obj);
292       out.flush();
293     } finally {
294       release(kryo);
295     }
296   }
297
298   /**
299    * Serializes given object to OutputStream using Kryo instance in pool.
300    *
301    * @param obj    Object to serialize
302    * @param stream to write to
303    */
304   public void serialize(final Object obj, final OutputStream stream) {
305     serialize(obj, stream, DEFAULT_BUFFER_SIZE);
306   }
307
308   /**
309    * Serializes given object to OutputStream using Kryo instance in pool.
310    *
311    * @param obj        Object to serialize
312    * @param stream     to write to
313    * @param bufferSize size of the buffer in front of the stream
314    */
315   public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
316     ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
317     Kryo kryo = borrow();
318     try {
319       kryo.writeClassAndObject(out, obj);
320       out.flush();
321     } finally {
322       release(kryo);
323     }
324   }
325
326   /**
327    * Deserializes given byte array to Object using Kryo instance in pool.
328    *
329    * @param bytes serialized bytes
330    * @param <T>   deserialized Object type
331    * @return deserialized Object
332    */
333   public <T> T deserialize(final byte[] bytes) {
334     return kryoInputPool.run(input -> {
335       input.setInputStream(new ByteArrayInputStream(bytes));
336       return kryoPool.run(kryo -> {
337         @SuppressWarnings("unchecked")
338         T obj = (T) kryo.readClassAndObject(input);
339         return obj;
340       });
341     }, DEFAULT_BUFFER_SIZE);
342   }
343
344   /**
345    * Deserializes given byte buffer to Object using Kryo instance in pool.
346    *
347    * @param buffer input with serialized bytes
348    * @param <T>    deserialized Object type
349    * @return deserialized Object
350    */
351   public <T> T deserialize(final ByteBuffer buffer) {
352     ByteBufferInput in = new ByteBufferInput(buffer);
353     Kryo kryo = borrow();
354     try {
355       @SuppressWarnings("unchecked")
356       T obj = (T) kryo.readClassAndObject(in);
357       return obj;
358     } finally {
359       release(kryo);
360     }
361   }
362
363   /**
364    * Deserializes given InputStream to an Object using Kryo instance in pool.
365    *
366    * @param stream input stream
367    * @param <T>    deserialized Object type
368    * @return deserialized Object
369    */
370   public <T> T deserialize(final InputStream stream) {
371     return deserialize(stream, DEFAULT_BUFFER_SIZE);
372   }
373
374   /**
375    * Deserializes given InputStream to an Object using Kryo instance in pool.
376    *
377    * @param stream     input stream
378    * @param <T>        deserialized Object type
379    * @param bufferSize size of the buffer in front of the stream
380    * @return deserialized Object
381    */
382   public <T> T deserialize(final InputStream stream, final int bufferSize) {
383     ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
384     Kryo kryo = borrow();
385     try {
386       @SuppressWarnings("unchecked")
387       T obj = (T) kryo.readClassAndObject(in);
388       return obj;
389     } finally {
390       release(kryo);
391     }
392   }
393
394   private String friendlyName() {
395     return friendlyName;
396   }
397
398   /**
399    * Gets the number of classes registered in this Kryo namespace.
400    *
401    * @return size of namespace
402    */
403   public int size() {
404     return (int) registeredBlocks.stream()
405         .flatMap(block -> block.types().stream())
406         .count();
407   }
408
409   /**
410    * Creates a Kryo instance.
411    *
412    * @return Kryo instance
413    */
414   @Override
415   public Kryo create() {
416     LOGGER.trace("Creating Kryo instance for {}", this);
417     Kryo kryo = new Kryo();
418     kryo.setClassLoader(classLoader);
419     kryo.setRegistrationRequired(registrationRequired);
420
421     // If compatible serialization is enabled, override the default serializer.
422     if (compatible) {
423       kryo.setDefaultSerializer(CompatibleFieldSerializer::new);
424     }
425
426     // TODO rethink whether we want to use StdInstantiatorStrategy
427     kryo.setInstantiatorStrategy(
428         new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
429
430     for (RegistrationBlock block : registeredBlocks) {
431       int id = block.begin();
432       if (id == FLOATING_ID) {
433         id = kryo.getNextRegistrationId();
434       }
435       for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
436         register(kryo, entry.getKey(), entry.getValue(), id++);
437       }
438     }
439     return kryo;
440   }
441
442   /**
443    * Register {@code type} and {@code serializer} to {@code kryo} instance.
444    *
445    * @param kryo       Kryo instance
446    * @param types      types to register
447    * @param serializer Specific serializer to register or null to use default.
448    * @param id         type registration id to use
449    */
450   private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
451     Registration existing = kryo.getRegistration(id);
452     if (existing != null) {
453       boolean matches = false;
454       for (Class<?> type : types) {
455         if (existing.getType() == type) {
456           matches = true;
457           break;
458         }
459       }
460
461       if (!matches) {
462         LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
463             friendlyName(), types, id, existing.getType());
464
465         throw new IllegalStateException(String.format(
466             "Failed to register %s as %s, %s was already registered.",
467             Arrays.toString(types), id, existing.getType()));
468       }
469       // falling through to register call for now.
470       // Consider skipping, if there's reasonable
471       // way to compare serializer equivalence.
472     }
473
474     for (Class<?> type : types) {
475       Registration r = null;
476       if (serializer == null) {
477         r = kryo.register(type, id);
478       } else if (type.isInterface()) {
479         kryo.addDefaultSerializer(type, serializer);
480       } else {
481         r = kryo.register(type, serializer, id);
482       }
483       if (r != null) {
484         if (r.getId() != id) {
485           LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
486               friendlyName(), r.getType(), r.getId(), id);
487         }
488         LOGGER.trace("{} registered as {}", r.getType(), r.getId());
489       }
490     }
491   }
492
493   @Override
494   public Kryo borrow() {
495     return kryoPool.borrow();
496   }
497
498   @Override
499   public void release(Kryo kryo) {
500     kryoPool.release(kryo);
501   }
502
503   @Override
504   public <T> T run(KryoCallback<T> callback) {
505     return kryoPool.run(callback);
506   }
507
508   @Override
509   public String toString() {
510     if (!NO_NAME.equals(friendlyName)) {
511       return MoreObjects.toStringHelper(getClass())
512           .omitNullValues()
513           .add("friendlyName", friendlyName)
514           // omit lengthy detail, when there's a name
515           .toString();
516     }
517     return MoreObjects.toStringHelper(getClass())
518         .add("registeredBlocks", registeredBlocks)
519         .toString();
520   }
521
522   static final class RegistrationBlock {
523     private final int begin;
524     private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
525
526     RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
527       this.begin = begin;
528       this.types = ImmutableList.copyOf(types);
529     }
530
531     public int begin() {
532       return begin;
533     }
534
535     public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
536       return types;
537     }
538
539     @Override
540     public String toString() {
541       return MoreObjects.toStringHelper(getClass())
542           .add("begin", begin)
543           .add("types", types)
544           .toString();
545     }
546
547     @Override
548     public int hashCode() {
549       return types.hashCode();
550     }
551
552     // Only the registered types are used for equality.
553     @Override
554     public boolean equals(Object obj) {
555       if (this == obj) {
556         return true;
557       }
558
559       if (obj instanceof RegistrationBlock) {
560         RegistrationBlock that = (RegistrationBlock) obj;
561         return Objects.equals(this.types, that.types);
562       }
563       return false;
564     }
565   }
566 }