Remove Namespace.Builder.setCompatible()
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.serializer;
17
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.google.common.base.MoreObjects;
27 import com.google.common.collect.ImmutableList;
28 import org.objenesis.strategy.StdInstantiatorStrategy;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Objects;
42
43 import static java.util.Objects.requireNonNull;
44
45 /**
46  * Pool of Kryo instances, with classes pre-registered.
47  */
48 //@ThreadSafe
49 public final class Namespace implements KryoFactory, KryoPool {
50
51   /**
52    * Default buffer size used for serialization.
53    *
54    * @see #serialize(Object)
55    */
56   public static final int DEFAULT_BUFFER_SIZE = 4096;
57
58   /**
59    * Maximum allowed buffer size.
60    */
61   public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
62
63   /**
64    * ID to use if this KryoNamespace does not define registration id.
65    */
66   private static final int FLOATING_ID = -1;
67
68   /**
69    * Smallest ID free to use for user defined registrations.
70    */
71   private static final int INITIAL_ID = 16;
72
73   static final String NO_NAME = "(no name)";
74
75   private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
76
77   /**
78    * Default Kryo namespace.
79    */
80   public static final Namespace DEFAULT = builder().build();
81
82   private final KryoPool kryoPool = new KryoPool.Builder(this)
83       .softReferences()
84       .build();
85
86   private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
87   private final KryoInputPool kryoInputPool = new KryoInputPool();
88
89   private final ImmutableList<RegistrationBlock> registeredBlocks;
90
91   private final ClassLoader classLoader;
92   private final boolean registrationRequired;
93   private final String friendlyName;
94
95   /**
96    * KryoNamespace builder.
97    */
98   //@NotThreadSafe
99   public static final class Builder {
100     private int blockHeadId = INITIAL_ID;
101     private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
102     private List<RegistrationBlock> blocks = new ArrayList<>();
103     private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
104     private boolean registrationRequired = true;
105
106     /**
107      * Builds a {@link Namespace} instance.
108      *
109      * @return KryoNamespace
110      */
111     public Namespace build() {
112       return build(NO_NAME);
113     }
114
115     /**
116      * Builds a {@link Namespace} instance.
117      *
118      * @param friendlyName friendly name for the namespace
119      * @return KryoNamespace
120      */
121     public Namespace build(String friendlyName) {
122       if (!types.isEmpty()) {
123         blocks.add(new RegistrationBlock(this.blockHeadId, types));
124       }
125       return new Namespace(blocks, classLoader, registrationRequired, friendlyName).populate(1);
126     }
127
128     /**
129      * Registers serializer for the given set of classes.
130      * <p>
131      * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
132      * all instances will be serialized with the same type ID.
133      *
134      * @param classes    list of classes to register
135      * @param serializer serializer to use for the class
136      * @return this
137      */
138     public Builder register(Serializer<?> serializer, final Class<?>... classes) {
139       types.add(Map.entry(classes, serializer));
140       return this;
141     }
142
143     /**
144      * Sets the namespace class loader.
145      *
146      * @param classLoader the namespace class loader
147      * @return the namespace builder
148      */
149     public Builder setClassLoader(ClassLoader classLoader) {
150       this.classLoader = classLoader;
151       return this;
152     }
153
154     /**
155      * Sets the registrationRequired flag.
156      *
157      * @param registrationRequired Kryo's registrationRequired flag
158      * @return this
159      * @see Kryo#setRegistrationRequired(boolean)
160      */
161     public Builder setRegistrationRequired(boolean registrationRequired) {
162       this.registrationRequired = registrationRequired;
163       return this;
164     }
165   }
166
167   /**
168    * Creates a new {@link Namespace} builder.
169    *
170    * @return builder
171    */
172   public static Builder builder() {
173     return new Builder();
174   }
175
176   /**
177    * Creates a Kryo instance pool.
178    *
179    * @param registeredTypes      types to register
180    * @param registrationRequired whether registration is required
181    * @param friendlyName         friendly name for the namespace
182    */
183   private Namespace(
184       final List<RegistrationBlock> registeredTypes,
185       ClassLoader classLoader,
186       boolean registrationRequired,
187       String friendlyName) {
188     this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
189     this.registrationRequired = registrationRequired;
190     this.classLoader = classLoader;
191     this.friendlyName = requireNonNull(friendlyName);
192   }
193
194   /**
195    * Populates the Kryo pool.
196    *
197    * @param instances to add to the pool
198    * @return this
199    */
200   public Namespace populate(int instances) {
201
202     for (int i = 0; i < instances; ++i) {
203       release(create());
204     }
205     return this;
206   }
207
208   /**
209    * Serializes given object to byte array using Kryo instance in pool.
210    * <p>
211    * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
212    *
213    * @param obj Object to serialize
214    * @return serialized bytes
215    */
216   public byte[] serialize(final Object obj) {
217     return serialize(obj, DEFAULT_BUFFER_SIZE);
218   }
219
220   /**
221    * Serializes given object to byte array using Kryo instance in pool.
222    *
223    * @param obj        Object to serialize
224    * @param bufferSize maximum size of serialized bytes
225    * @return serialized bytes
226    */
227   public byte[] serialize(final Object obj, final int bufferSize) {
228     return kryoOutputPool.run(output -> {
229       return kryoPool.run(kryo -> {
230         kryo.writeClassAndObject(output, obj);
231         output.flush();
232         return output.getByteArrayOutputStream().toByteArray();
233       });
234     }, bufferSize);
235   }
236
237   /**
238    * Serializes given object to byte buffer using Kryo instance in pool.
239    *
240    * @param obj    Object to serialize
241    * @param buffer to write to
242    */
243   public void serialize(final Object obj, final ByteBuffer buffer) {
244     ByteBufferOutput out = new ByteBufferOutput(buffer);
245     Kryo kryo = borrow();
246     try {
247       kryo.writeClassAndObject(out, obj);
248       out.flush();
249     } finally {
250       release(kryo);
251     }
252   }
253
254   /**
255    * Serializes given object to OutputStream using Kryo instance in pool.
256    *
257    * @param obj    Object to serialize
258    * @param stream to write to
259    */
260   public void serialize(final Object obj, final OutputStream stream) {
261     serialize(obj, stream, DEFAULT_BUFFER_SIZE);
262   }
263
264   /**
265    * Serializes given object to OutputStream using Kryo instance in pool.
266    *
267    * @param obj        Object to serialize
268    * @param stream     to write to
269    * @param bufferSize size of the buffer in front of the stream
270    */
271   public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
272     ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
273     Kryo kryo = borrow();
274     try {
275       kryo.writeClassAndObject(out, obj);
276       out.flush();
277     } finally {
278       release(kryo);
279     }
280   }
281
282   /**
283    * Deserializes given byte array to Object using Kryo instance in pool.
284    *
285    * @param bytes serialized bytes
286    * @param <T>   deserialized Object type
287    * @return deserialized Object
288    */
289   public <T> T deserialize(final byte[] bytes) {
290     return kryoInputPool.run(input -> {
291       input.setInputStream(new ByteArrayInputStream(bytes));
292       return kryoPool.run(kryo -> {
293         @SuppressWarnings("unchecked")
294         T obj = (T) kryo.readClassAndObject(input);
295         return obj;
296       });
297     }, DEFAULT_BUFFER_SIZE);
298   }
299
300   /**
301    * Deserializes given byte buffer to Object using Kryo instance in pool.
302    *
303    * @param buffer input with serialized bytes
304    * @param <T>    deserialized Object type
305    * @return deserialized Object
306    */
307   public <T> T deserialize(final ByteBuffer buffer) {
308     ByteBufferInput in = new ByteBufferInput(buffer);
309     Kryo kryo = borrow();
310     try {
311       @SuppressWarnings("unchecked")
312       T obj = (T) kryo.readClassAndObject(in);
313       return obj;
314     } finally {
315       release(kryo);
316     }
317   }
318
319   /**
320    * Deserializes given InputStream to an Object using Kryo instance in pool.
321    *
322    * @param stream input stream
323    * @param <T>    deserialized Object type
324    * @return deserialized Object
325    */
326   public <T> T deserialize(final InputStream stream) {
327     return deserialize(stream, DEFAULT_BUFFER_SIZE);
328   }
329
330   /**
331    * Deserializes given InputStream to an Object using Kryo instance in pool.
332    *
333    * @param stream     input stream
334    * @param <T>        deserialized Object type
335    * @param bufferSize size of the buffer in front of the stream
336    * @return deserialized Object
337    */
338   public <T> T deserialize(final InputStream stream, final int bufferSize) {
339     ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
340     Kryo kryo = borrow();
341     try {
342       @SuppressWarnings("unchecked")
343       T obj = (T) kryo.readClassAndObject(in);
344       return obj;
345     } finally {
346       release(kryo);
347     }
348   }
349
350   private String friendlyName() {
351     return friendlyName;
352   }
353
354   /**
355    * Gets the number of classes registered in this Kryo namespace.
356    *
357    * @return size of namespace
358    */
359   public int size() {
360     return (int) registeredBlocks.stream()
361         .flatMap(block -> block.types().stream())
362         .count();
363   }
364
365   /**
366    * Creates a Kryo instance.
367    *
368    * @return Kryo instance
369    */
370   @Override
371   public Kryo create() {
372     LOGGER.trace("Creating Kryo instance for {}", this);
373     Kryo kryo = new Kryo();
374     kryo.setClassLoader(classLoader);
375     kryo.setRegistrationRequired(registrationRequired);
376
377     // TODO rethink whether we want to use StdInstantiatorStrategy
378     kryo.setInstantiatorStrategy(
379         new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
380
381     for (RegistrationBlock block : registeredBlocks) {
382       int id = block.begin();
383       if (id == FLOATING_ID) {
384         id = kryo.getNextRegistrationId();
385       }
386       for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
387         register(kryo, entry.getKey(), entry.getValue(), id++);
388       }
389     }
390     return kryo;
391   }
392
393   /**
394    * Register {@code type} and {@code serializer} to {@code kryo} instance.
395    *
396    * @param kryo       Kryo instance
397    * @param types      types to register
398    * @param serializer Specific serializer to register or null to use default.
399    * @param id         type registration id to use
400    */
401   private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
402     Registration existing = kryo.getRegistration(id);
403     if (existing != null) {
404       boolean matches = false;
405       for (Class<?> type : types) {
406         if (existing.getType() == type) {
407           matches = true;
408           break;
409         }
410       }
411
412       if (!matches) {
413         LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
414             friendlyName(), types, id, existing.getType());
415
416         throw new IllegalStateException(String.format(
417             "Failed to register %s as %s, %s was already registered.",
418             Arrays.toString(types), id, existing.getType()));
419       }
420       // falling through to register call for now.
421       // Consider skipping, if there's reasonable
422       // way to compare serializer equivalence.
423     }
424
425     for (Class<?> type : types) {
426       Registration r = null;
427       if (serializer == null) {
428         r = kryo.register(type, id);
429       } else if (type.isInterface()) {
430         kryo.addDefaultSerializer(type, serializer);
431       } else {
432         r = kryo.register(type, serializer, id);
433       }
434       if (r != null) {
435         if (r.getId() != id) {
436           LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
437               friendlyName(), r.getType(), r.getId(), id);
438         }
439         LOGGER.trace("{} registered as {}", r.getType(), r.getId());
440       }
441     }
442   }
443
444   @Override
445   public Kryo borrow() {
446     return kryoPool.borrow();
447   }
448
449   @Override
450   public void release(Kryo kryo) {
451     kryoPool.release(kryo);
452   }
453
454   @Override
455   public <T> T run(KryoCallback<T> callback) {
456     return kryoPool.run(callback);
457   }
458
459   @Override
460   public String toString() {
461     if (!NO_NAME.equals(friendlyName)) {
462       return MoreObjects.toStringHelper(getClass())
463           .omitNullValues()
464           .add("friendlyName", friendlyName)
465           // omit lengthy detail, when there's a name
466           .toString();
467     }
468     return MoreObjects.toStringHelper(getClass())
469         .add("registeredBlocks", registeredBlocks)
470         .toString();
471   }
472
473   static final class RegistrationBlock {
474     private final int begin;
475     private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
476
477     RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
478       this.begin = begin;
479       this.types = ImmutableList.copyOf(types);
480     }
481
482     public int begin() {
483       return begin;
484     }
485
486     public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
487       return types;
488     }
489
490     @Override
491     public String toString() {
492       return MoreObjects.toStringHelper(getClass())
493           .add("begin", begin)
494           .add("types", types)
495           .toString();
496     }
497
498     @Override
499     public int hashCode() {
500       return types.hashCode();
501     }
502
503     // Only the registered types are used for equality.
504     @Override
505     public boolean equals(Object obj) {
506       if (this == obj) {
507         return true;
508       }
509
510       if (obj instanceof RegistrationBlock) {
511         RegistrationBlock that = (RegistrationBlock) obj;
512         return Objects.equals(this.types, that.types);
513       }
514       return false;
515     }
516   }
517 }