Remove Namespace.Builder.nextId()
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.serializer;
17
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
27 import com.google.common.base.MoreObjects;
28 import com.google.common.collect.ImmutableList;
29 import org.objenesis.strategy.StdInstantiatorStrategy;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import java.io.ByteArrayInputStream;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Map.Entry;
42 import java.util.Objects;
43
44 import static java.util.Objects.requireNonNull;
45
46 /**
47  * Pool of Kryo instances, with classes pre-registered.
48  */
49 //@ThreadSafe
50 public final class Namespace implements KryoFactory, KryoPool {
51
52   /**
53    * Default buffer size used for serialization.
54    *
55    * @see #serialize(Object)
56    */
57   public static final int DEFAULT_BUFFER_SIZE = 4096;
58
59   /**
60    * Maximum allowed buffer size.
61    */
62   public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
63
64   /**
65    * ID to use if this KryoNamespace does not define registration id.
66    */
67   private static final int FLOATING_ID = -1;
68
69   /**
70    * Smallest ID free to use for user defined registrations.
71    */
72   private static final int INITIAL_ID = 16;
73
74   static final String NO_NAME = "(no name)";
75
76   private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
77
78   /**
79    * Default Kryo namespace.
80    */
81   public static final Namespace DEFAULT = builder().build();
82
83   private final KryoPool kryoPool = new KryoPool.Builder(this)
84       .softReferences()
85       .build();
86
87   private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
88   private final KryoInputPool kryoInputPool = new KryoInputPool();
89
90   private final ImmutableList<RegistrationBlock> registeredBlocks;
91
92   private final ClassLoader classLoader;
93   private final boolean compatible;
94   private final boolean registrationRequired;
95   private final String friendlyName;
96
97   /**
98    * KryoNamespace builder.
99    */
100   //@NotThreadSafe
101   public static final class Builder {
102     private int blockHeadId = INITIAL_ID;
103     private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
104     private List<RegistrationBlock> blocks = new ArrayList<>();
105     private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
106     private boolean registrationRequired = true;
107     private boolean compatible = false;
108
109     /**
110      * Builds a {@link Namespace} instance.
111      *
112      * @return KryoNamespace
113      */
114     public Namespace build() {
115       return build(NO_NAME);
116     }
117
118     /**
119      * Builds a {@link Namespace} instance.
120      *
121      * @param friendlyName friendly name for the namespace
122      * @return KryoNamespace
123      */
124     public Namespace build(String friendlyName) {
125       if (!types.isEmpty()) {
126         blocks.add(new RegistrationBlock(this.blockHeadId, types));
127       }
128       return new Namespace(blocks, classLoader, registrationRequired, compatible, friendlyName).populate(1);
129     }
130
131     /**
132      * Registers serializer for the given set of classes.
133      * <p>
134      * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
135      * all instances will be serialized with the same type ID.
136      *
137      * @param classes    list of classes to register
138      * @param serializer serializer to use for the class
139      * @return this
140      */
141     public Builder register(Serializer<?> serializer, final Class<?>... classes) {
142       types.add(Map.entry(classes, serializer));
143       return this;
144     }
145
146     /**
147      * Sets the namespace class loader.
148      *
149      * @param classLoader the namespace class loader
150      * @return the namespace builder
151      */
152     public Builder setClassLoader(ClassLoader classLoader) {
153       this.classLoader = classLoader;
154       return this;
155     }
156
157     /**
158      * Sets whether backwards/forwards compatible versioned serialization is enabled.
159      * <p>
160      * When compatible serialization is enabled, the {@link CompatibleFieldSerializer} will be set as the
161      * default serializer for types that do not otherwise explicitly specify a serializer.
162      *
163      * @param compatible whether versioned serialization is enabled
164      * @return this
165      */
166     public Builder setCompatible(boolean compatible) {
167       this.compatible = compatible;
168       return this;
169     }
170
171     /**
172      * Sets the registrationRequired flag.
173      *
174      * @param registrationRequired Kryo's registrationRequired flag
175      * @return this
176      * @see Kryo#setRegistrationRequired(boolean)
177      */
178     public Builder setRegistrationRequired(boolean registrationRequired) {
179       this.registrationRequired = registrationRequired;
180       return this;
181     }
182   }
183
184   /**
185    * Creates a new {@link Namespace} builder.
186    *
187    * @return builder
188    */
189   public static Builder builder() {
190     return new Builder();
191   }
192
193   /**
194    * Creates a Kryo instance pool.
195    *
196    * @param registeredTypes      types to register
197    * @param registrationRequired whether registration is required
198    * @param compatible           whether compatible serialization is enabled
199    * @param friendlyName         friendly name for the namespace
200    */
201   private Namespace(
202       final List<RegistrationBlock> registeredTypes,
203       ClassLoader classLoader,
204       boolean registrationRequired,
205       boolean compatible,
206       String friendlyName) {
207     this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
208     this.registrationRequired = registrationRequired;
209     this.classLoader = classLoader;
210     this.compatible = compatible;
211     this.friendlyName = requireNonNull(friendlyName);
212   }
213
214   /**
215    * Populates the Kryo pool.
216    *
217    * @param instances to add to the pool
218    * @return this
219    */
220   public Namespace populate(int instances) {
221
222     for (int i = 0; i < instances; ++i) {
223       release(create());
224     }
225     return this;
226   }
227
228   /**
229    * Serializes given object to byte array using Kryo instance in pool.
230    * <p>
231    * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
232    *
233    * @param obj Object to serialize
234    * @return serialized bytes
235    */
236   public byte[] serialize(final Object obj) {
237     return serialize(obj, DEFAULT_BUFFER_SIZE);
238   }
239
240   /**
241    * Serializes given object to byte array using Kryo instance in pool.
242    *
243    * @param obj        Object to serialize
244    * @param bufferSize maximum size of serialized bytes
245    * @return serialized bytes
246    */
247   public byte[] serialize(final Object obj, final int bufferSize) {
248     return kryoOutputPool.run(output -> {
249       return kryoPool.run(kryo -> {
250         kryo.writeClassAndObject(output, obj);
251         output.flush();
252         return output.getByteArrayOutputStream().toByteArray();
253       });
254     }, bufferSize);
255   }
256
257   /**
258    * Serializes given object to byte buffer using Kryo instance in pool.
259    *
260    * @param obj    Object to serialize
261    * @param buffer to write to
262    */
263   public void serialize(final Object obj, final ByteBuffer buffer) {
264     ByteBufferOutput out = new ByteBufferOutput(buffer);
265     Kryo kryo = borrow();
266     try {
267       kryo.writeClassAndObject(out, obj);
268       out.flush();
269     } finally {
270       release(kryo);
271     }
272   }
273
274   /**
275    * Serializes given object to OutputStream using Kryo instance in pool.
276    *
277    * @param obj    Object to serialize
278    * @param stream to write to
279    */
280   public void serialize(final Object obj, final OutputStream stream) {
281     serialize(obj, stream, DEFAULT_BUFFER_SIZE);
282   }
283
284   /**
285    * Serializes given object to OutputStream using Kryo instance in pool.
286    *
287    * @param obj        Object to serialize
288    * @param stream     to write to
289    * @param bufferSize size of the buffer in front of the stream
290    */
291   public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
292     ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
293     Kryo kryo = borrow();
294     try {
295       kryo.writeClassAndObject(out, obj);
296       out.flush();
297     } finally {
298       release(kryo);
299     }
300   }
301
302   /**
303    * Deserializes given byte array to Object using Kryo instance in pool.
304    *
305    * @param bytes serialized bytes
306    * @param <T>   deserialized Object type
307    * @return deserialized Object
308    */
309   public <T> T deserialize(final byte[] bytes) {
310     return kryoInputPool.run(input -> {
311       input.setInputStream(new ByteArrayInputStream(bytes));
312       return kryoPool.run(kryo -> {
313         @SuppressWarnings("unchecked")
314         T obj = (T) kryo.readClassAndObject(input);
315         return obj;
316       });
317     }, DEFAULT_BUFFER_SIZE);
318   }
319
320   /**
321    * Deserializes given byte buffer to Object using Kryo instance in pool.
322    *
323    * @param buffer input with serialized bytes
324    * @param <T>    deserialized Object type
325    * @return deserialized Object
326    */
327   public <T> T deserialize(final ByteBuffer buffer) {
328     ByteBufferInput in = new ByteBufferInput(buffer);
329     Kryo kryo = borrow();
330     try {
331       @SuppressWarnings("unchecked")
332       T obj = (T) kryo.readClassAndObject(in);
333       return obj;
334     } finally {
335       release(kryo);
336     }
337   }
338
339   /**
340    * Deserializes given InputStream to an Object using Kryo instance in pool.
341    *
342    * @param stream input stream
343    * @param <T>    deserialized Object type
344    * @return deserialized Object
345    */
346   public <T> T deserialize(final InputStream stream) {
347     return deserialize(stream, DEFAULT_BUFFER_SIZE);
348   }
349
350   /**
351    * Deserializes given InputStream to an Object using Kryo instance in pool.
352    *
353    * @param stream     input stream
354    * @param <T>        deserialized Object type
355    * @param bufferSize size of the buffer in front of the stream
356    * @return deserialized Object
357    */
358   public <T> T deserialize(final InputStream stream, final int bufferSize) {
359     ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
360     Kryo kryo = borrow();
361     try {
362       @SuppressWarnings("unchecked")
363       T obj = (T) kryo.readClassAndObject(in);
364       return obj;
365     } finally {
366       release(kryo);
367     }
368   }
369
370   private String friendlyName() {
371     return friendlyName;
372   }
373
374   /**
375    * Gets the number of classes registered in this Kryo namespace.
376    *
377    * @return size of namespace
378    */
379   public int size() {
380     return (int) registeredBlocks.stream()
381         .flatMap(block -> block.types().stream())
382         .count();
383   }
384
385   /**
386    * Creates a Kryo instance.
387    *
388    * @return Kryo instance
389    */
390   @Override
391   public Kryo create() {
392     LOGGER.trace("Creating Kryo instance for {}", this);
393     Kryo kryo = new Kryo();
394     kryo.setClassLoader(classLoader);
395     kryo.setRegistrationRequired(registrationRequired);
396
397     // If compatible serialization is enabled, override the default serializer.
398     if (compatible) {
399       kryo.setDefaultSerializer(CompatibleFieldSerializer::new);
400     }
401
402     // TODO rethink whether we want to use StdInstantiatorStrategy
403     kryo.setInstantiatorStrategy(
404         new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
405
406     for (RegistrationBlock block : registeredBlocks) {
407       int id = block.begin();
408       if (id == FLOATING_ID) {
409         id = kryo.getNextRegistrationId();
410       }
411       for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
412         register(kryo, entry.getKey(), entry.getValue(), id++);
413       }
414     }
415     return kryo;
416   }
417
418   /**
419    * Register {@code type} and {@code serializer} to {@code kryo} instance.
420    *
421    * @param kryo       Kryo instance
422    * @param types      types to register
423    * @param serializer Specific serializer to register or null to use default.
424    * @param id         type registration id to use
425    */
426   private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
427     Registration existing = kryo.getRegistration(id);
428     if (existing != null) {
429       boolean matches = false;
430       for (Class<?> type : types) {
431         if (existing.getType() == type) {
432           matches = true;
433           break;
434         }
435       }
436
437       if (!matches) {
438         LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
439             friendlyName(), types, id, existing.getType());
440
441         throw new IllegalStateException(String.format(
442             "Failed to register %s as %s, %s was already registered.",
443             Arrays.toString(types), id, existing.getType()));
444       }
445       // falling through to register call for now.
446       // Consider skipping, if there's reasonable
447       // way to compare serializer equivalence.
448     }
449
450     for (Class<?> type : types) {
451       Registration r = null;
452       if (serializer == null) {
453         r = kryo.register(type, id);
454       } else if (type.isInterface()) {
455         kryo.addDefaultSerializer(type, serializer);
456       } else {
457         r = kryo.register(type, serializer, id);
458       }
459       if (r != null) {
460         if (r.getId() != id) {
461           LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
462               friendlyName(), r.getType(), r.getId(), id);
463         }
464         LOGGER.trace("{} registered as {}", r.getType(), r.getId());
465       }
466     }
467   }
468
469   @Override
470   public Kryo borrow() {
471     return kryoPool.borrow();
472   }
473
474   @Override
475   public void release(Kryo kryo) {
476     kryoPool.release(kryo);
477   }
478
479   @Override
480   public <T> T run(KryoCallback<T> callback) {
481     return kryoPool.run(callback);
482   }
483
484   @Override
485   public String toString() {
486     if (!NO_NAME.equals(friendlyName)) {
487       return MoreObjects.toStringHelper(getClass())
488           .omitNullValues()
489           .add("friendlyName", friendlyName)
490           // omit lengthy detail, when there's a name
491           .toString();
492     }
493     return MoreObjects.toStringHelper(getClass())
494         .add("registeredBlocks", registeredBlocks)
495         .toString();
496   }
497
498   static final class RegistrationBlock {
499     private final int begin;
500     private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
501
502     RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
503       this.begin = begin;
504       this.types = ImmutableList.copyOf(types);
505     }
506
507     public int begin() {
508       return begin;
509     }
510
511     public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
512       return types;
513     }
514
515     @Override
516     public String toString() {
517       return MoreObjects.toStringHelper(getClass())
518           .add("begin", begin)
519           .add("types", types)
520           .toString();
521     }
522
523     @Override
524     public int hashCode() {
525       return types.hashCode();
526     }
527
528     // Only the registered types are used for equality.
529     @Override
530     public boolean equals(Object obj) {
531       if (this == obj) {
532         return true;
533       }
534
535       if (obj instanceof RegistrationBlock) {
536         RegistrationBlock that = (RegistrationBlock) obj;
537         return Objects.equals(this.types, that.types);
538       }
539       return false;
540     }
541   }
542 }