Remove Namespace.DEFAULT
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.serializer;
17
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.google.common.base.MoreObjects;
27 import com.google.common.collect.ImmutableList;
28 import org.objenesis.strategy.StdInstantiatorStrategy;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Objects;
42
43 import static java.util.Objects.requireNonNull;
44
45 /**
46  * Pool of Kryo instances, with classes pre-registered.
47  */
48 //@ThreadSafe
49 public final class Namespace implements KryoFactory, KryoPool {
50
51   /**
52    * Default buffer size used for serialization.
53    *
54    * @see #serialize(Object)
55    */
56   public static final int DEFAULT_BUFFER_SIZE = 4096;
57
58   /**
59    * ID to use if this KryoNamespace does not define registration id.
60    */
61   private static final int FLOATING_ID = -1;
62
63   /**
64    * Smallest ID free to use for user defined registrations.
65    */
66   private static final int INITIAL_ID = 16;
67
68   static final String NO_NAME = "(no name)";
69
70   private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
71
72   private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
73
74   private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
75   private final KryoInputPool kryoInputPool = new KryoInputPool();
76
77   private final ImmutableList<RegistrationBlock> registeredBlocks;
78
79   private final ClassLoader classLoader;
80   private final boolean registrationRequired;
81   private final String friendlyName;
82
83   /**
84    * KryoNamespace builder.
85    */
86   //@NotThreadSafe
87   public static final class Builder {
88     private int blockHeadId = INITIAL_ID;
89     private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
90     private List<RegistrationBlock> blocks = new ArrayList<>();
91     private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
92     private boolean registrationRequired = true;
93
94     /**
95      * Builds a {@link Namespace} instance.
96      *
97      * @return KryoNamespace
98      */
99     public Namespace build() {
100       return build(NO_NAME);
101     }
102
103     /**
104      * Builds a {@link Namespace} instance.
105      *
106      * @param friendlyName friendly name for the namespace
107      * @return KryoNamespace
108      */
109     public Namespace build(String friendlyName) {
110       if (!types.isEmpty()) {
111         blocks.add(new RegistrationBlock(this.blockHeadId, types));
112       }
113       return new Namespace(blocks, classLoader, registrationRequired, friendlyName).populate(1);
114     }
115
116     /**
117      * Registers serializer for the given set of classes.
118      * <p>
119      * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
120      * all instances will be serialized with the same type ID.
121      *
122      * @param classes    list of classes to register
123      * @param serializer serializer to use for the class
124      * @return this
125      */
126     public Builder register(Serializer<?> serializer, final Class<?>... classes) {
127       types.add(Map.entry(classes, serializer));
128       return this;
129     }
130
131     /**
132      * Sets the namespace class loader.
133      *
134      * @param classLoader the namespace class loader
135      * @return the namespace builder
136      */
137     public Builder setClassLoader(ClassLoader classLoader) {
138       this.classLoader = classLoader;
139       return this;
140     }
141
142     /**
143      * Sets the registrationRequired flag.
144      *
145      * @param registrationRequired Kryo's registrationRequired flag
146      * @return this
147      * @see Kryo#setRegistrationRequired(boolean)
148      */
149     public Builder setRegistrationRequired(boolean registrationRequired) {
150       this.registrationRequired = registrationRequired;
151       return this;
152     }
153   }
154
155   /**
156    * Creates a new {@link Namespace} builder.
157    *
158    * @return builder
159    */
160   public static Builder builder() {
161     return new Builder();
162   }
163
164   /**
165    * Creates a Kryo instance pool.
166    *
167    * @param registeredTypes      types to register
168    * @param registrationRequired whether registration is required
169    * @param friendlyName         friendly name for the namespace
170    */
171   private Namespace(
172       final List<RegistrationBlock> registeredTypes,
173       ClassLoader classLoader,
174       boolean registrationRequired,
175       String friendlyName) {
176     this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
177     this.registrationRequired = registrationRequired;
178     this.classLoader = classLoader;
179     this.friendlyName = requireNonNull(friendlyName);
180   }
181
182   /**
183    * Populates the Kryo pool.
184    *
185    * @param instances to add to the pool
186    * @return this
187    */
188   public Namespace populate(int instances) {
189
190     for (int i = 0; i < instances; ++i) {
191       release(create());
192     }
193     return this;
194   }
195
196   /**
197    * Serializes given object to byte array using Kryo instance in pool.
198    * <p>
199    * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
200    *
201    * @param obj Object to serialize
202    * @return serialized bytes
203    */
204   public byte[] serialize(final Object obj) {
205     return serialize(obj, DEFAULT_BUFFER_SIZE);
206   }
207
208   /**
209    * Serializes given object to byte array using Kryo instance in pool.
210    *
211    * @param obj        Object to serialize
212    * @param bufferSize maximum size of serialized bytes
213    * @return serialized bytes
214    */
215   public byte[] serialize(final Object obj, final int bufferSize) {
216     return kryoOutputPool.run(output -> {
217       return kryoPool.run(kryo -> {
218         kryo.writeClassAndObject(output, obj);
219         output.flush();
220         return output.getByteArrayOutputStream().toByteArray();
221       });
222     }, bufferSize);
223   }
224
225   /**
226    * Serializes given object to byte buffer using Kryo instance in pool.
227    *
228    * @param obj    Object to serialize
229    * @param buffer to write to
230    */
231   public void serialize(final Object obj, final ByteBuffer buffer) {
232     ByteBufferOutput out = new ByteBufferOutput(buffer);
233     Kryo kryo = borrow();
234     try {
235       kryo.writeClassAndObject(out, obj);
236       out.flush();
237     } finally {
238       release(kryo);
239     }
240   }
241
242   /**
243    * Serializes given object to OutputStream using Kryo instance in pool.
244    *
245    * @param obj    Object to serialize
246    * @param stream to write to
247    */
248   public void serialize(final Object obj, final OutputStream stream) {
249     serialize(obj, stream, DEFAULT_BUFFER_SIZE);
250   }
251
252   /**
253    * Serializes given object to OutputStream using Kryo instance in pool.
254    *
255    * @param obj        Object to serialize
256    * @param stream     to write to
257    * @param bufferSize size of the buffer in front of the stream
258    */
259   public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
260     ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
261     Kryo kryo = borrow();
262     try {
263       kryo.writeClassAndObject(out, obj);
264       out.flush();
265     } finally {
266       release(kryo);
267     }
268   }
269
270   /**
271    * Deserializes given byte array to Object using Kryo instance in pool.
272    *
273    * @param bytes serialized bytes
274    * @param <T>   deserialized Object type
275    * @return deserialized Object
276    */
277   public <T> T deserialize(final byte[] bytes) {
278     return kryoInputPool.run(input -> {
279       input.setInputStream(new ByteArrayInputStream(bytes));
280       return kryoPool.run(kryo -> {
281         @SuppressWarnings("unchecked")
282         T obj = (T) kryo.readClassAndObject(input);
283         return obj;
284       });
285     }, DEFAULT_BUFFER_SIZE);
286   }
287
288   /**
289    * Deserializes given byte buffer to Object using Kryo instance in pool.
290    *
291    * @param buffer input with serialized bytes
292    * @param <T>    deserialized Object type
293    * @return deserialized Object
294    */
295   public <T> T deserialize(final ByteBuffer buffer) {
296     ByteBufferInput in = new ByteBufferInput(buffer);
297     Kryo kryo = borrow();
298     try {
299       @SuppressWarnings("unchecked")
300       T obj = (T) kryo.readClassAndObject(in);
301       return obj;
302     } finally {
303       release(kryo);
304     }
305   }
306
307   /**
308    * Deserializes given InputStream to an Object using Kryo instance in pool.
309    *
310    * @param stream input stream
311    * @param <T>    deserialized Object type
312    * @return deserialized Object
313    */
314   public <T> T deserialize(final InputStream stream) {
315     return deserialize(stream, DEFAULT_BUFFER_SIZE);
316   }
317
318   /**
319    * Deserializes given InputStream to an Object using Kryo instance in pool.
320    *
321    * @param stream     input stream
322    * @param <T>        deserialized Object type
323    * @param bufferSize size of the buffer in front of the stream
324    * @return deserialized Object
325    */
326   public <T> T deserialize(final InputStream stream, final int bufferSize) {
327     ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
328     Kryo kryo = borrow();
329     try {
330       @SuppressWarnings("unchecked")
331       T obj = (T) kryo.readClassAndObject(in);
332       return obj;
333     } finally {
334       release(kryo);
335     }
336   }
337
338   private String friendlyName() {
339     return friendlyName;
340   }
341
342   /**
343    * Gets the number of classes registered in this Kryo namespace.
344    *
345    * @return size of namespace
346    */
347   public int size() {
348     return (int) registeredBlocks.stream()
349         .flatMap(block -> block.types().stream())
350         .count();
351   }
352
353   /**
354    * Creates a Kryo instance.
355    *
356    * @return Kryo instance
357    */
358   @Override
359   public Kryo create() {
360     LOGGER.trace("Creating Kryo instance for {}", this);
361     Kryo kryo = new Kryo();
362     kryo.setClassLoader(classLoader);
363     kryo.setRegistrationRequired(registrationRequired);
364
365     // TODO rethink whether we want to use StdInstantiatorStrategy
366     kryo.setInstantiatorStrategy(
367         new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
368
369     for (RegistrationBlock block : registeredBlocks) {
370       int id = block.begin();
371       if (id == FLOATING_ID) {
372         id = kryo.getNextRegistrationId();
373       }
374       for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
375         register(kryo, entry.getKey(), entry.getValue(), id++);
376       }
377     }
378     return kryo;
379   }
380
381   /**
382    * Register {@code type} and {@code serializer} to {@code kryo} instance.
383    *
384    * @param kryo       Kryo instance
385    * @param types      types to register
386    * @param serializer Specific serializer to register or null to use default.
387    * @param id         type registration id to use
388    */
389   private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
390     Registration existing = kryo.getRegistration(id);
391     if (existing != null) {
392       boolean matches = false;
393       for (Class<?> type : types) {
394         if (existing.getType() == type) {
395           matches = true;
396           break;
397         }
398       }
399
400       if (!matches) {
401         LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
402             friendlyName(), types, id, existing.getType());
403
404         throw new IllegalStateException(String.format(
405             "Failed to register %s as %s, %s was already registered.",
406             Arrays.toString(types), id, existing.getType()));
407       }
408       // falling through to register call for now.
409       // Consider skipping, if there's reasonable
410       // way to compare serializer equivalence.
411     }
412
413     for (Class<?> type : types) {
414       Registration r = null;
415       if (serializer == null) {
416         r = kryo.register(type, id);
417       } else if (type.isInterface()) {
418         kryo.addDefaultSerializer(type, serializer);
419       } else {
420         r = kryo.register(type, serializer, id);
421       }
422       if (r != null) {
423         if (r.getId() != id) {
424           LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
425               friendlyName(), r.getType(), r.getId(), id);
426         }
427         LOGGER.trace("{} registered as {}", r.getType(), r.getId());
428       }
429     }
430   }
431
432   @Override
433   public Kryo borrow() {
434     return kryoPool.borrow();
435   }
436
437   @Override
438   public void release(Kryo kryo) {
439     kryoPool.release(kryo);
440   }
441
442   @Override
443   public <T> T run(KryoCallback<T> callback) {
444     return kryoPool.run(callback);
445   }
446
447   @Override
448   public String toString() {
449     if (!NO_NAME.equals(friendlyName)) {
450       return MoreObjects.toStringHelper(getClass())
451           .omitNullValues()
452           .add("friendlyName", friendlyName)
453           // omit lengthy detail, when there's a name
454           .toString();
455     }
456     return MoreObjects.toStringHelper(getClass())
457         .add("registeredBlocks", registeredBlocks)
458         .toString();
459   }
460
461   static final class RegistrationBlock {
462     private final int begin;
463     private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
464
465     RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
466       this.begin = begin;
467       this.types = ImmutableList.copyOf(types);
468     }
469
470     public int begin() {
471       return begin;
472     }
473
474     public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
475       return types;
476     }
477
478     @Override
479     public String toString() {
480       return MoreObjects.toStringHelper(getClass())
481           .add("begin", begin)
482           .add("types", types)
483           .toString();
484     }
485
486     @Override
487     public int hashCode() {
488       return types.hashCode();
489     }
490
491     // Only the registered types are used for equality.
492     @Override
493     public boolean equals(Object obj) {
494       if (this == obj) {
495         return true;
496       }
497
498       if (obj instanceof RegistrationBlock) {
499         RegistrationBlock that = (RegistrationBlock) obj;
500         return Objects.equals(this.types, that.types);
501       }
502       return false;
503     }
504   }
505 }