Eliminate atomix.utils.memory
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.serializer;
17
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
27 import com.google.common.base.MoreObjects;
28 import com.google.common.collect.ImmutableList;
29 import org.apache.commons.lang3.tuple.Pair;
30 import org.objenesis.strategy.StdInstantiatorStrategy;
31 import org.slf4j.Logger;
32
33 import java.io.ByteArrayInputStream;
34 import java.io.InputStream;
35 import java.io.OutputStream;
36 import java.nio.ByteBuffer;
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.List;
40 import java.util.Objects;
41
42 import static com.google.common.base.Preconditions.checkNotNull;
43 import static org.slf4j.LoggerFactory.getLogger;
44
45 /**
46  * Pool of Kryo instances, with classes pre-registered.
47  */
48 //@ThreadSafe
49 public final class Namespace implements KryoFactory, KryoPool {
50
51   /**
52    * Default buffer size used for serialization.
53    *
54    * @see #serialize(Object)
55    */
56   public static final int DEFAULT_BUFFER_SIZE = 4096;
57
58   /**
59    * Maximum allowed buffer size.
60    */
61   public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
62
63   /**
64    * ID to use if this KryoNamespace does not define registration id.
65    */
66   public static final int FLOATING_ID = -1;
67
68   /**
69    * Smallest ID free to use for user defined registrations.
70    */
71   public static final int INITIAL_ID = 16;
72
73   static final String NO_NAME = "(no name)";
74
75   private static final Logger LOGGER = getLogger(Namespace.class);
76
77   /**
78    * Default Kryo namespace.
79    */
80   public static final Namespace DEFAULT = builder().build();
81
82   private final KryoPool kryoPool = new KryoPool.Builder(this)
83       .softReferences()
84       .build();
85
86   private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
87   private final KryoInputPool kryoInputPool = new KryoInputPool();
88
89   private final ImmutableList<RegistrationBlock> registeredBlocks;
90
91   private final ClassLoader classLoader;
92   private final boolean compatible;
93   private final boolean registrationRequired;
94   private final String friendlyName;
95
96   /**
97    * KryoNamespace builder.
98    */
99   //@NotThreadSafe
100   public static final class Builder {
101     private int blockHeadId = INITIAL_ID;
102     private List<Pair<Class<?>[], Serializer<?>>> types = new ArrayList<>();
103     private List<RegistrationBlock> blocks = new ArrayList<>();
104     private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
105     private boolean registrationRequired = true;
106     private boolean compatible = false;
107
108     /**
109      * Builds a {@link Namespace} instance.
110      *
111      * @return KryoNamespace
112      */
113     public Namespace build() {
114       return build(NO_NAME);
115     }
116
117     /**
118      * Builds a {@link Namespace} instance.
119      *
120      * @param friendlyName friendly name for the namespace
121      * @return KryoNamespace
122      */
123     public Namespace build(String friendlyName) {
124       if (!types.isEmpty()) {
125         blocks.add(new RegistrationBlock(this.blockHeadId, types));
126       }
127       return new Namespace(blocks, classLoader, registrationRequired, compatible, friendlyName).populate(1);
128     }
129
130     /**
131      * Sets the next Kryo registration Id for following register entries.
132      *
133      * @param id Kryo registration Id
134      * @return this
135      * @see Kryo#register(Class, Serializer, int)
136      */
137     public Builder nextId(final int id) {
138       if (!types.isEmpty()) {
139         if (id != FLOATING_ID && id < blockHeadId + types.size()) {
140
141           if (LOGGER.isWarnEnabled()) {
142             LOGGER.warn("requested nextId {} could potentially overlap "
143                     + "with existing registrations {}+{} ",
144                 id, blockHeadId, types.size(), new RuntimeException());
145           }
146         }
147         blocks.add(new RegistrationBlock(this.blockHeadId, types));
148         types = new ArrayList<>();
149       }
150       this.blockHeadId = id;
151       return this;
152     }
153
154     /**
155      * Registers classes to be serialized using Kryo default serializer.
156      *
157      * @param expectedTypes list of classes
158      * @return this
159      */
160     public Builder register(final Class<?>... expectedTypes) {
161       for (Class<?> clazz : expectedTypes) {
162         types.add(Pair.of(new Class<?>[]{clazz}, null));
163       }
164       return this;
165     }
166
167     /**
168      * Registers serializer for the given set of classes.
169      * <p>
170      * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
171      * all instances will be serialized with the same type ID.
172      *
173      * @param classes    list of classes to register
174      * @param serializer serializer to use for the class
175      * @return this
176      */
177     public Builder register(Serializer<?> serializer, final Class<?>... classes) {
178       types.add(Pair.of(classes, checkNotNull(serializer)));
179       return this;
180     }
181
182     private Builder register(RegistrationBlock block) {
183       if (block.begin() != FLOATING_ID) {
184         // flush pending types
185         nextId(block.begin());
186         blocks.add(block);
187         nextId(block.begin() + block.types().size());
188       } else {
189         // flush pending types
190         final int addedBlockBegin = blockHeadId + types.size();
191         nextId(addedBlockBegin);
192         blocks.add(new RegistrationBlock(addedBlockBegin, block.types()));
193         nextId(addedBlockBegin + block.types().size());
194       }
195       return this;
196     }
197
198     /**
199      * Registers all the class registered to given KryoNamespace.
200      *
201      * @param ns KryoNamespace
202      * @return this
203      */
204     public Builder register(final Namespace ns) {
205
206       if (blocks.containsAll(ns.registeredBlocks)) {
207         // Everything was already registered.
208         LOGGER.debug("Ignoring {}, already registered.", ns);
209         return this;
210       }
211       for (RegistrationBlock block : ns.registeredBlocks) {
212         this.register(block);
213       }
214       return this;
215     }
216
217     /**
218      * Sets the namespace class loader.
219      *
220      * @param classLoader the namespace class loader
221      * @return the namespace builder
222      */
223     public Builder setClassLoader(ClassLoader classLoader) {
224       this.classLoader = classLoader;
225       return this;
226     }
227
228     /**
229      * Sets whether backwards/forwards compatible versioned serialization is enabled.
230      * <p>
231      * When compatible serialization is enabled, the {@link CompatibleFieldSerializer} will be set as the
232      * default serializer for types that do not otherwise explicitly specify a serializer.
233      *
234      * @param compatible whether versioned serialization is enabled
235      * @return this
236      */
237     public Builder setCompatible(boolean compatible) {
238       this.compatible = compatible;
239       return this;
240     }
241
242     /**
243      * Sets the registrationRequired flag.
244      *
245      * @param registrationRequired Kryo's registrationRequired flag
246      * @return this
247      * @see Kryo#setRegistrationRequired(boolean)
248      */
249     public Builder setRegistrationRequired(boolean registrationRequired) {
250       this.registrationRequired = registrationRequired;
251       return this;
252     }
253   }
254
255   /**
256    * Creates a new {@link Namespace} builder.
257    *
258    * @return builder
259    */
260   public static Builder builder() {
261     return new Builder();
262   }
263
264   /**
265    * Creates a Kryo instance pool.
266    *
267    * @param registeredTypes      types to register
268    * @param registrationRequired whether registration is required
269    * @param compatible           whether compatible serialization is enabled
270    * @param friendlyName         friendly name for the namespace
271    */
272   private Namespace(
273       final List<RegistrationBlock> registeredTypes,
274       ClassLoader classLoader,
275       boolean registrationRequired,
276       boolean compatible,
277       String friendlyName) {
278     this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
279     this.registrationRequired = registrationRequired;
280     this.classLoader = classLoader;
281     this.compatible = compatible;
282     this.friendlyName = checkNotNull(friendlyName);
283   }
284
285   /**
286    * Populates the Kryo pool.
287    *
288    * @param instances to add to the pool
289    * @return this
290    */
291   public Namespace populate(int instances) {
292
293     for (int i = 0; i < instances; ++i) {
294       release(create());
295     }
296     return this;
297   }
298
299   /**
300    * Serializes given object to byte array using Kryo instance in pool.
301    * <p>
302    * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
303    *
304    * @param obj Object to serialize
305    * @return serialized bytes
306    */
307   public byte[] serialize(final Object obj) {
308     return serialize(obj, DEFAULT_BUFFER_SIZE);
309   }
310
311   /**
312    * Serializes given object to byte array using Kryo instance in pool.
313    *
314    * @param obj        Object to serialize
315    * @param bufferSize maximum size of serialized bytes
316    * @return serialized bytes
317    */
318   public byte[] serialize(final Object obj, final int bufferSize) {
319     return kryoOutputPool.run(output -> {
320       return kryoPool.run(kryo -> {
321         kryo.writeClassAndObject(output, obj);
322         output.flush();
323         return output.getByteArrayOutputStream().toByteArray();
324       });
325     }, bufferSize);
326   }
327
328   /**
329    * Serializes given object to byte buffer using Kryo instance in pool.
330    *
331    * @param obj    Object to serialize
332    * @param buffer to write to
333    */
334   public void serialize(final Object obj, final ByteBuffer buffer) {
335     ByteBufferOutput out = new ByteBufferOutput(buffer);
336     Kryo kryo = borrow();
337     try {
338       kryo.writeClassAndObject(out, obj);
339       out.flush();
340     } finally {
341       release(kryo);
342     }
343   }
344
345   /**
346    * Serializes given object to OutputStream using Kryo instance in pool.
347    *
348    * @param obj    Object to serialize
349    * @param stream to write to
350    */
351   public void serialize(final Object obj, final OutputStream stream) {
352     serialize(obj, stream, DEFAULT_BUFFER_SIZE);
353   }
354
355   /**
356    * Serializes given object to OutputStream using Kryo instance in pool.
357    *
358    * @param obj        Object to serialize
359    * @param stream     to write to
360    * @param bufferSize size of the buffer in front of the stream
361    */
362   public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
363     ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
364     Kryo kryo = borrow();
365     try {
366       kryo.writeClassAndObject(out, obj);
367       out.flush();
368     } finally {
369       release(kryo);
370     }
371   }
372
373   /**
374    * Deserializes given byte array to Object using Kryo instance in pool.
375    *
376    * @param bytes serialized bytes
377    * @param <T>   deserialized Object type
378    * @return deserialized Object
379    */
380   public <T> T deserialize(final byte[] bytes) {
381     return kryoInputPool.run(input -> {
382       input.setInputStream(new ByteArrayInputStream(bytes));
383       return kryoPool.run(kryo -> {
384         @SuppressWarnings("unchecked")
385         T obj = (T) kryo.readClassAndObject(input);
386         return obj;
387       });
388     }, DEFAULT_BUFFER_SIZE);
389   }
390
391   /**
392    * Deserializes given byte buffer to Object using Kryo instance in pool.
393    *
394    * @param buffer input with serialized bytes
395    * @param <T>    deserialized Object type
396    * @return deserialized Object
397    */
398   public <T> T deserialize(final ByteBuffer buffer) {
399     ByteBufferInput in = new ByteBufferInput(buffer);
400     Kryo kryo = borrow();
401     try {
402       @SuppressWarnings("unchecked")
403       T obj = (T) kryo.readClassAndObject(in);
404       return obj;
405     } finally {
406       release(kryo);
407     }
408   }
409
410   /**
411    * Deserializes given InputStream to an Object using Kryo instance in pool.
412    *
413    * @param stream input stream
414    * @param <T>    deserialized Object type
415    * @return deserialized Object
416    */
417   public <T> T deserialize(final InputStream stream) {
418     return deserialize(stream, DEFAULT_BUFFER_SIZE);
419   }
420
421   /**
422    * Deserializes given InputStream to an Object using Kryo instance in pool.
423    *
424    * @param stream     input stream
425    * @param <T>        deserialized Object type
426    * @param bufferSize size of the buffer in front of the stream
427    * @return deserialized Object
428    */
429   public <T> T deserialize(final InputStream stream, final int bufferSize) {
430     ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
431     Kryo kryo = borrow();
432     try {
433       @SuppressWarnings("unchecked")
434       T obj = (T) kryo.readClassAndObject(in);
435       return obj;
436     } finally {
437       release(kryo);
438     }
439   }
440
441   private String friendlyName() {
442     return friendlyName;
443   }
444
445   /**
446    * Gets the number of classes registered in this Kryo namespace.
447    *
448    * @return size of namespace
449    */
450   public int size() {
451     return (int) registeredBlocks.stream()
452         .flatMap(block -> block.types().stream())
453         .count();
454   }
455
456   /**
457    * Creates a Kryo instance.
458    *
459    * @return Kryo instance
460    */
461   @Override
462   public Kryo create() {
463     LOGGER.trace("Creating Kryo instance for {}", this);
464     Kryo kryo = new Kryo();
465     kryo.setClassLoader(classLoader);
466     kryo.setRegistrationRequired(registrationRequired);
467
468     // If compatible serialization is enabled, override the default serializer.
469     if (compatible) {
470       kryo.setDefaultSerializer(CompatibleFieldSerializer::new);
471     }
472
473     // TODO rethink whether we want to use StdInstantiatorStrategy
474     kryo.setInstantiatorStrategy(
475         new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
476
477     for (RegistrationBlock block : registeredBlocks) {
478       int id = block.begin();
479       if (id == FLOATING_ID) {
480         id = kryo.getNextRegistrationId();
481       }
482       for (Pair<Class<?>[], Serializer<?>> entry : block.types()) {
483         register(kryo, entry.getLeft(), entry.getRight(), id++);
484       }
485     }
486     return kryo;
487   }
488
489   /**
490    * Register {@code type} and {@code serializer} to {@code kryo} instance.
491    *
492    * @param kryo       Kryo instance
493    * @param types      types to register
494    * @param serializer Specific serializer to register or null to use default.
495    * @param id         type registration id to use
496    */
497   private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
498     Registration existing = kryo.getRegistration(id);
499     if (existing != null) {
500       boolean matches = false;
501       for (Class<?> type : types) {
502         if (existing.getType() == type) {
503           matches = true;
504           break;
505         }
506       }
507
508       if (!matches) {
509         LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
510             friendlyName(), types, id, existing.getType());
511
512         throw new IllegalStateException(String.format(
513             "Failed to register %s as %s, %s was already registered.",
514             Arrays.toString(types), id, existing.getType()));
515       }
516       // falling through to register call for now.
517       // Consider skipping, if there's reasonable
518       // way to compare serializer equivalence.
519     }
520
521     for (Class<?> type : types) {
522       Registration r = null;
523       if (serializer == null) {
524         r = kryo.register(type, id);
525       } else if (type.isInterface()) {
526         kryo.addDefaultSerializer(type, serializer);
527       } else {
528         r = kryo.register(type, serializer, id);
529       }
530       if (r != null) {
531         if (r.getId() != id) {
532           LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
533               friendlyName(), r.getType(), r.getId(), id);
534         }
535         LOGGER.trace("{} registered as {}", r.getType(), r.getId());
536       }
537     }
538   }
539
540   @Override
541   public Kryo borrow() {
542     return kryoPool.borrow();
543   }
544
545   @Override
546   public void release(Kryo kryo) {
547     kryoPool.release(kryo);
548   }
549
550   @Override
551   public <T> T run(KryoCallback<T> callback) {
552     return kryoPool.run(callback);
553   }
554
555   @Override
556   public String toString() {
557     if (!NO_NAME.equals(friendlyName)) {
558       return MoreObjects.toStringHelper(getClass())
559           .omitNullValues()
560           .add("friendlyName", friendlyName)
561           // omit lengthy detail, when there's a name
562           .toString();
563     }
564     return MoreObjects.toStringHelper(getClass())
565         .add("registeredBlocks", registeredBlocks)
566         .toString();
567   }
568
569   static final class RegistrationBlock {
570     private final int begin;
571     private final ImmutableList<Pair<Class<?>[], Serializer<?>>> types;
572
573     RegistrationBlock(int begin, List<Pair<Class<?>[], Serializer<?>>> types) {
574       this.begin = begin;
575       this.types = ImmutableList.copyOf(types);
576     }
577
578     public int begin() {
579       return begin;
580     }
581
582     public ImmutableList<Pair<Class<?>[], Serializer<?>>> types() {
583       return types;
584     }
585
586     @Override
587     public String toString() {
588       return MoreObjects.toStringHelper(getClass())
589           .add("begin", begin)
590           .add("types", types)
591           .toString();
592     }
593
594     @Override
595     public int hashCode() {
596       return types.hashCode();
597     }
598
599     // Only the registered types are used for equality.
600     @Override
601     public boolean equals(Object obj) {
602       if (this == obj) {
603         return true;
604       }
605
606       if (obj instanceof RegistrationBlock) {
607         RegistrationBlock that = (RegistrationBlock) obj;
608         return Objects.equals(this.types, that.types);
609       }
610       return false;
611     }
612   }
613 }