Import atomix/{storage,utils}
[controller.git] / third-party / atomix / utils / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.serializer;
17
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
27 import com.google.common.base.MoreObjects;
28 import com.google.common.collect.ImmutableList;
29 import io.atomix.utils.config.ConfigurationException;
30 import org.apache.commons.lang3.tuple.Pair;
31 import org.objenesis.strategy.StdInstantiatorStrategy;
32 import org.slf4j.Logger;
33
34 import java.io.ByteArrayInputStream;
35 import java.io.InputStream;
36 import java.io.OutputStream;
37 import java.nio.ByteBuffer;
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.List;
42 import java.util.Objects;
43
44 import static com.google.common.base.Preconditions.checkNotNull;
45 import static org.slf4j.LoggerFactory.getLogger;
46
47 /**
48  * Pool of Kryo instances, with classes pre-registered.
49  */
50 //@ThreadSafe
51 public final class Namespace implements KryoFactory, KryoPool {
52
53   /**
54    * Default buffer size used for serialization.
55    *
56    * @see #serialize(Object)
57    */
58   public static final int DEFAULT_BUFFER_SIZE = 4096;
59
60   /**
61    * Maximum allowed buffer size.
62    */
63   public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
64
65   /**
66    * ID to use if this KryoNamespace does not define registration id.
67    */
68   public static final int FLOATING_ID = -1;
69
70   /**
71    * Smallest ID free to use for user defined registrations.
72    */
73   public static final int INITIAL_ID = 16;
74
75   static final String NO_NAME = "(no name)";
76
77   private static final Logger LOGGER = getLogger(Namespace.class);
78
79   /**
80    * Default Kryo namespace.
81    */
82   public static final Namespace DEFAULT = builder().build();
83
84   private final KryoPool kryoPool = new KryoPool.Builder(this)
85       .softReferences()
86       .build();
87
88   private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
89   private final KryoInputPool kryoInputPool = new KryoInputPool();
90
91   private final ImmutableList<RegistrationBlock> registeredBlocks;
92
93   private final ClassLoader classLoader;
94   private final boolean compatible;
95   private final boolean registrationRequired;
96   private final String friendlyName;
97
98   /**
99    * KryoNamespace builder.
100    */
101   //@NotThreadSafe
102   public static final class Builder {
103     private int blockHeadId = INITIAL_ID;
104     private List<Pair<Class<?>[], Serializer<?>>> types = new ArrayList<>();
105     private List<RegistrationBlock> blocks = new ArrayList<>();
106     private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
107     private boolean registrationRequired = true;
108     private boolean compatible = false;
109
110     /**
111      * Builds a {@link Namespace} instance.
112      *
113      * @return KryoNamespace
114      */
115     public Namespace build() {
116       return build(NO_NAME);
117     }
118
119     /**
120      * Builds a {@link Namespace} instance.
121      *
122      * @param friendlyName friendly name for the namespace
123      * @return KryoNamespace
124      */
125     public Namespace build(String friendlyName) {
126       if (!types.isEmpty()) {
127         blocks.add(new RegistrationBlock(this.blockHeadId, types));
128       }
129       return new Namespace(blocks, classLoader, registrationRequired, compatible, friendlyName).populate(1);
130     }
131
132     /**
133      * Sets the next Kryo registration Id for following register entries.
134      *
135      * @param id Kryo registration Id
136      * @return this
137      * @see Kryo#register(Class, Serializer, int)
138      */
139     public Builder nextId(final int id) {
140       if (!types.isEmpty()) {
141         if (id != FLOATING_ID && id < blockHeadId + types.size()) {
142
143           if (LOGGER.isWarnEnabled()) {
144             LOGGER.warn("requested nextId {} could potentially overlap "
145                     + "with existing registrations {}+{} ",
146                 id, blockHeadId, types.size(), new RuntimeException());
147           }
148         }
149         blocks.add(new RegistrationBlock(this.blockHeadId, types));
150         types = new ArrayList<>();
151       }
152       this.blockHeadId = id;
153       return this;
154     }
155
156     /**
157      * Registers classes to be serialized using Kryo default serializer.
158      *
159      * @param expectedTypes list of classes
160      * @return this
161      */
162     public Builder register(final Class<?>... expectedTypes) {
163       for (Class<?> clazz : expectedTypes) {
164         types.add(Pair.of(new Class<?>[]{clazz}, null));
165       }
166       return this;
167     }
168
169     /**
170      * Registers serializer for the given set of classes.
171      * <p>
172      * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
173      * all instances will be serialized with the same type ID.
174      *
175      * @param classes    list of classes to register
176      * @param serializer serializer to use for the class
177      * @return this
178      */
179     public Builder register(Serializer<?> serializer, final Class<?>... classes) {
180       types.add(Pair.of(classes, checkNotNull(serializer)));
181       return this;
182     }
183
184     private Builder register(RegistrationBlock block) {
185       if (block.begin() != FLOATING_ID) {
186         // flush pending types
187         nextId(block.begin());
188         blocks.add(block);
189         nextId(block.begin() + block.types().size());
190       } else {
191         // flush pending types
192         final int addedBlockBegin = blockHeadId + types.size();
193         nextId(addedBlockBegin);
194         blocks.add(new RegistrationBlock(addedBlockBegin, block.types()));
195         nextId(addedBlockBegin + block.types().size());
196       }
197       return this;
198     }
199
200     /**
201      * Registers all the class registered to given KryoNamespace.
202      *
203      * @param ns KryoNamespace
204      * @return this
205      */
206     public Builder register(final Namespace ns) {
207
208       if (blocks.containsAll(ns.registeredBlocks)) {
209         // Everything was already registered.
210         LOGGER.debug("Ignoring {}, already registered.", ns);
211         return this;
212       }
213       for (RegistrationBlock block : ns.registeredBlocks) {
214         this.register(block);
215       }
216       return this;
217     }
218
219     /**
220      * Sets the namespace class loader.
221      *
222      * @param classLoader the namespace class loader
223      * @return the namespace builder
224      */
225     public Builder setClassLoader(ClassLoader classLoader) {
226       this.classLoader = classLoader;
227       return this;
228     }
229
230     /**
231      * Sets whether backwards/forwards compatible versioned serialization is enabled.
232      * <p>
233      * When compatible serialization is enabled, the {@link CompatibleFieldSerializer} will be set as the
234      * default serializer for types that do not otherwise explicitly specify a serializer.
235      *
236      * @param compatible whether versioned serialization is enabled
237      * @return this
238      */
239     public Builder setCompatible(boolean compatible) {
240       this.compatible = compatible;
241       return this;
242     }
243
244     /**
245      * Sets the registrationRequired flag.
246      *
247      * @param registrationRequired Kryo's registrationRequired flag
248      * @return this
249      * @see Kryo#setRegistrationRequired(boolean)
250      */
251     public Builder setRegistrationRequired(boolean registrationRequired) {
252       this.registrationRequired = registrationRequired;
253       return this;
254     }
255   }
256
257   /**
258    * Creates a new {@link Namespace} builder.
259    *
260    * @return builder
261    */
262   public static Builder builder() {
263     return new Builder();
264   }
265
266   @SuppressWarnings("unchecked")
267   private static List<RegistrationBlock> buildRegistrationBlocks(NamespaceConfig config) {
268     List<Pair<Class<?>[], Serializer<?>>> types = new ArrayList<>();
269     List<RegistrationBlock> blocks = new ArrayList<>();
270     blocks.addAll(Namespaces.BASIC.registeredBlocks);
271     for (NamespaceTypeConfig type : config.getTypes()) {
272       try {
273         if (type.getId() == null) {
274           types.add(Pair.of(new Class[]{type.getType()}, type.getSerializer() != null ? type.getSerializer().newInstance() : null));
275         } else {
276           blocks.add(new RegistrationBlock(type.getId(), Collections.singletonList(Pair.of(new Class[]{type.getType()}, type.getSerializer().newInstance()))));
277         }
278       } catch (InstantiationException | IllegalAccessException e) {
279         throw new ConfigurationException("Failed to instantiate serializer from configuration", e);
280       }
281     }
282     blocks.add(new RegistrationBlock(FLOATING_ID, types));
283     return blocks;
284   }
285
286   public Namespace(NamespaceConfig config) {
287     this(buildRegistrationBlocks(config), Thread.currentThread().getContextClassLoader(), config.isRegistrationRequired(), config.isCompatible(), config.getName());
288   }
289
290   /**
291    * Creates a Kryo instance pool.
292    *
293    * @param registeredTypes      types to register
294    * @param registrationRequired whether registration is required
295    * @param compatible           whether compatible serialization is enabled
296    * @param friendlyName         friendly name for the namespace
297    */
298   private Namespace(
299       final List<RegistrationBlock> registeredTypes,
300       ClassLoader classLoader,
301       boolean registrationRequired,
302       boolean compatible,
303       String friendlyName) {
304     this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
305     this.registrationRequired = registrationRequired;
306     this.classLoader = classLoader;
307     this.compatible = compatible;
308     this.friendlyName = checkNotNull(friendlyName);
309   }
310
311   /**
312    * Populates the Kryo pool.
313    *
314    * @param instances to add to the pool
315    * @return this
316    */
317   public Namespace populate(int instances) {
318
319     for (int i = 0; i < instances; ++i) {
320       release(create());
321     }
322     return this;
323   }
324
325   /**
326    * Serializes given object to byte array using Kryo instance in pool.
327    * <p>
328    * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
329    *
330    * @param obj Object to serialize
331    * @return serialized bytes
332    */
333   public byte[] serialize(final Object obj) {
334     return serialize(obj, DEFAULT_BUFFER_SIZE);
335   }
336
337   /**
338    * Serializes given object to byte array using Kryo instance in pool.
339    *
340    * @param obj        Object to serialize
341    * @param bufferSize maximum size of serialized bytes
342    * @return serialized bytes
343    */
344   public byte[] serialize(final Object obj, final int bufferSize) {
345     return kryoOutputPool.run(output -> {
346       return kryoPool.run(kryo -> {
347         kryo.writeClassAndObject(output, obj);
348         output.flush();
349         return output.getByteArrayOutputStream().toByteArray();
350       });
351     }, bufferSize);
352   }
353
354   /**
355    * Serializes given object to byte buffer using Kryo instance in pool.
356    *
357    * @param obj    Object to serialize
358    * @param buffer to write to
359    */
360   public void serialize(final Object obj, final ByteBuffer buffer) {
361     ByteBufferOutput out = new ByteBufferOutput(buffer);
362     Kryo kryo = borrow();
363     try {
364       kryo.writeClassAndObject(out, obj);
365       out.flush();
366     } finally {
367       release(kryo);
368     }
369   }
370
371   /**
372    * Serializes given object to OutputStream using Kryo instance in pool.
373    *
374    * @param obj    Object to serialize
375    * @param stream to write to
376    */
377   public void serialize(final Object obj, final OutputStream stream) {
378     serialize(obj, stream, DEFAULT_BUFFER_SIZE);
379   }
380
381   /**
382    * Serializes given object to OutputStream using Kryo instance in pool.
383    *
384    * @param obj        Object to serialize
385    * @param stream     to write to
386    * @param bufferSize size of the buffer in front of the stream
387    */
388   public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
389     ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
390     Kryo kryo = borrow();
391     try {
392       kryo.writeClassAndObject(out, obj);
393       out.flush();
394     } finally {
395       release(kryo);
396     }
397   }
398
399   /**
400    * Deserializes given byte array to Object using Kryo instance in pool.
401    *
402    * @param bytes serialized bytes
403    * @param <T>   deserialized Object type
404    * @return deserialized Object
405    */
406   public <T> T deserialize(final byte[] bytes) {
407     return kryoInputPool.run(input -> {
408       input.setInputStream(new ByteArrayInputStream(bytes));
409       return kryoPool.run(kryo -> {
410         @SuppressWarnings("unchecked")
411         T obj = (T) kryo.readClassAndObject(input);
412         return obj;
413       });
414     }, DEFAULT_BUFFER_SIZE);
415   }
416
417   /**
418    * Deserializes given byte buffer to Object using Kryo instance in pool.
419    *
420    * @param buffer input with serialized bytes
421    * @param <T>    deserialized Object type
422    * @return deserialized Object
423    */
424   public <T> T deserialize(final ByteBuffer buffer) {
425     ByteBufferInput in = new ByteBufferInput(buffer);
426     Kryo kryo = borrow();
427     try {
428       @SuppressWarnings("unchecked")
429       T obj = (T) kryo.readClassAndObject(in);
430       return obj;
431     } finally {
432       release(kryo);
433     }
434   }
435
436   /**
437    * Deserializes given InputStream to an Object using Kryo instance in pool.
438    *
439    * @param stream input stream
440    * @param <T>    deserialized Object type
441    * @return deserialized Object
442    */
443   public <T> T deserialize(final InputStream stream) {
444     return deserialize(stream, DEFAULT_BUFFER_SIZE);
445   }
446
447   /**
448    * Deserializes given InputStream to an Object using Kryo instance in pool.
449    *
450    * @param stream     input stream
451    * @param <T>        deserialized Object type
452    * @param bufferSize size of the buffer in front of the stream
453    * @return deserialized Object
454    */
455   public <T> T deserialize(final InputStream stream, final int bufferSize) {
456     ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
457     Kryo kryo = borrow();
458     try {
459       @SuppressWarnings("unchecked")
460       T obj = (T) kryo.readClassAndObject(in);
461       return obj;
462     } finally {
463       release(kryo);
464     }
465   }
466
467   private String friendlyName() {
468     return friendlyName;
469   }
470
471   /**
472    * Gets the number of classes registered in this Kryo namespace.
473    *
474    * @return size of namespace
475    */
476   public int size() {
477     return (int) registeredBlocks.stream()
478         .flatMap(block -> block.types().stream())
479         .count();
480   }
481
482   /**
483    * Creates a Kryo instance.
484    *
485    * @return Kryo instance
486    */
487   @Override
488   public Kryo create() {
489     LOGGER.trace("Creating Kryo instance for {}", this);
490     Kryo kryo = new Kryo();
491     kryo.setClassLoader(classLoader);
492     kryo.setRegistrationRequired(registrationRequired);
493
494     // If compatible serialization is enabled, override the default serializer.
495     if (compatible) {
496       kryo.setDefaultSerializer(CompatibleFieldSerializer::new);
497     }
498
499     // TODO rethink whether we want to use StdInstantiatorStrategy
500     kryo.setInstantiatorStrategy(
501         new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
502
503     for (RegistrationBlock block : registeredBlocks) {
504       int id = block.begin();
505       if (id == FLOATING_ID) {
506         id = kryo.getNextRegistrationId();
507       }
508       for (Pair<Class<?>[], Serializer<?>> entry : block.types()) {
509         register(kryo, entry.getLeft(), entry.getRight(), id++);
510       }
511     }
512     return kryo;
513   }
514
515   /**
516    * Register {@code type} and {@code serializer} to {@code kryo} instance.
517    *
518    * @param kryo       Kryo instance
519    * @param types      types to register
520    * @param serializer Specific serializer to register or null to use default.
521    * @param id         type registration id to use
522    */
523   private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
524     Registration existing = kryo.getRegistration(id);
525     if (existing != null) {
526       boolean matches = false;
527       for (Class<?> type : types) {
528         if (existing.getType() == type) {
529           matches = true;
530           break;
531         }
532       }
533
534       if (!matches) {
535         LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
536             friendlyName(), types, id, existing.getType());
537
538         throw new IllegalStateException(String.format(
539             "Failed to register %s as %s, %s was already registered.",
540             Arrays.toString(types), id, existing.getType()));
541       }
542       // falling through to register call for now.
543       // Consider skipping, if there's reasonable
544       // way to compare serializer equivalence.
545     }
546
547     for (Class<?> type : types) {
548       Registration r = null;
549       if (serializer == null) {
550         r = kryo.register(type, id);
551       } else if (type.isInterface()) {
552         kryo.addDefaultSerializer(type, serializer);
553       } else {
554         r = kryo.register(type, serializer, id);
555       }
556       if (r != null) {
557         if (r.getId() != id) {
558           LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
559               friendlyName(), r.getType(), r.getId(), id);
560         }
561         LOGGER.trace("{} registered as {}", r.getType(), r.getId());
562       }
563     }
564   }
565
566   @Override
567   public Kryo borrow() {
568     return kryoPool.borrow();
569   }
570
571   @Override
572   public void release(Kryo kryo) {
573     kryoPool.release(kryo);
574   }
575
576   @Override
577   public <T> T run(KryoCallback<T> callback) {
578     return kryoPool.run(callback);
579   }
580
581   @Override
582   public String toString() {
583     if (!NO_NAME.equals(friendlyName)) {
584       return MoreObjects.toStringHelper(getClass())
585           .omitNullValues()
586           .add("friendlyName", friendlyName)
587           // omit lengthy detail, when there's a name
588           .toString();
589     }
590     return MoreObjects.toStringHelper(getClass())
591         .add("registeredBlocks", registeredBlocks)
592         .toString();
593   }
594
595   static final class RegistrationBlock {
596     private final int begin;
597     private final ImmutableList<Pair<Class<?>[], Serializer<?>>> types;
598
599     RegistrationBlock(int begin, List<Pair<Class<?>[], Serializer<?>>> types) {
600       this.begin = begin;
601       this.types = ImmutableList.copyOf(types);
602     }
603
604     public int begin() {
605       return begin;
606     }
607
608     public ImmutableList<Pair<Class<?>[], Serializer<?>>> types() {
609       return types;
610     }
611
612     @Override
613     public String toString() {
614       return MoreObjects.toStringHelper(getClass())
615           .add("begin", begin)
616           .add("types", types)
617           .toString();
618     }
619
620     @Override
621     public int hashCode() {
622       return types.hashCode();
623     }
624
625     // Only the registered types are used for equality.
626     @Override
627     public boolean equals(Object obj) {
628       if (this == obj) {
629         return true;
630       }
631
632       if (obj instanceof RegistrationBlock) {
633         RegistrationBlock that = (RegistrationBlock) obj;
634         return Objects.equals(this.types, that.types);
635       }
636       return false;
637     }
638   }
639 }