Remove Namespace.friendlyName()
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.serializer;
17
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.google.common.base.MoreObjects;
27 import com.google.common.collect.ImmutableList;
28 import org.objenesis.strategy.StdInstantiatorStrategy;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Objects;
42
43 import static java.util.Objects.requireNonNull;
44
45 /**
46  * Pool of Kryo instances, with classes pre-registered.
47  */
48 //@ThreadSafe
49 public final class Namespace implements KryoFactory, KryoPool {
50
51   /**
52    * Default buffer size used for serialization.
53    *
54    * @see #serialize(Object)
55    */
56   public static final int DEFAULT_BUFFER_SIZE = 4096;
57
58   /**
59    * ID to use if this KryoNamespace does not define registration id.
60    */
61   private static final int FLOATING_ID = -1;
62
63   /**
64    * Smallest ID free to use for user defined registrations.
65    */
66   private static final int INITIAL_ID = 16;
67
68   static final String NO_NAME = "(no name)";
69
70   private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
71
72   private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
73
74   private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
75   private final KryoInputPool kryoInputPool = new KryoInputPool();
76
77   private final ImmutableList<RegistrationBlock> registeredBlocks;
78
79   private final ClassLoader classLoader;
80   private final String friendlyName;
81
82   /**
83    * KryoNamespace builder.
84    */
85   //@NotThreadSafe
86   public static final class Builder {
87     private int blockHeadId = INITIAL_ID;
88     private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
89     private List<RegistrationBlock> blocks = new ArrayList<>();
90     private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
91
92     /**
93      * Builds a {@link Namespace} instance.
94      *
95      * @return KryoNamespace
96      */
97     public Namespace build() {
98       return build(NO_NAME);
99     }
100
101     /**
102      * Builds a {@link Namespace} instance.
103      *
104      * @param friendlyName friendly name for the namespace
105      * @return KryoNamespace
106      */
107     public Namespace build(String friendlyName) {
108       if (!types.isEmpty()) {
109         blocks.add(new RegistrationBlock(this.blockHeadId, types));
110       }
111       return new Namespace(blocks, classLoader, friendlyName).populate(1);
112     }
113
114     /**
115      * Registers serializer for the given set of classes.
116      * <p>
117      * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
118      * all instances will be serialized with the same type ID.
119      *
120      * @param classes    list of classes to register
121      * @param serializer serializer to use for the class
122      * @return this
123      */
124     public Builder register(Serializer<?> serializer, final Class<?>... classes) {
125       types.add(Map.entry(classes, serializer));
126       return this;
127     }
128
129     /**
130      * Sets the namespace class loader.
131      *
132      * @param classLoader the namespace class loader
133      * @return the namespace builder
134      */
135     public Builder setClassLoader(ClassLoader classLoader) {
136       this.classLoader = classLoader;
137       return this;
138     }
139   }
140
141   /**
142    * Creates a new {@link Namespace} builder.
143    *
144    * @return builder
145    */
146   public static Builder builder() {
147     return new Builder();
148   }
149
150   /**
151    * Creates a Kryo instance pool.
152    *
153    * @param registeredTypes      types to register
154    * @param registrationRequired whether registration is required
155    * @param friendlyName         friendly name for the namespace
156    */
157   private Namespace(
158       final List<RegistrationBlock> registeredTypes,
159       ClassLoader classLoader,
160       String friendlyName) {
161     this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
162     this.classLoader = classLoader;
163     this.friendlyName = requireNonNull(friendlyName);
164   }
165
166   /**
167    * Populates the Kryo pool.
168    *
169    * @param instances to add to the pool
170    * @return this
171    */
172   public Namespace populate(int instances) {
173
174     for (int i = 0; i < instances; ++i) {
175       release(create());
176     }
177     return this;
178   }
179
180   /**
181    * Serializes given object to byte array using Kryo instance in pool.
182    * <p>
183    * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
184    *
185    * @param obj Object to serialize
186    * @return serialized bytes
187    */
188   public byte[] serialize(final Object obj) {
189     return serialize(obj, DEFAULT_BUFFER_SIZE);
190   }
191
192   /**
193    * Serializes given object to byte array using Kryo instance in pool.
194    *
195    * @param obj        Object to serialize
196    * @param bufferSize maximum size of serialized bytes
197    * @return serialized bytes
198    */
199   public byte[] serialize(final Object obj, final int bufferSize) {
200     return kryoOutputPool.run(output -> {
201       return kryoPool.run(kryo -> {
202         kryo.writeClassAndObject(output, obj);
203         output.flush();
204         return output.getByteArrayOutputStream().toByteArray();
205       });
206     }, bufferSize);
207   }
208
209   /**
210    * Serializes given object to byte buffer using Kryo instance in pool.
211    *
212    * @param obj    Object to serialize
213    * @param buffer to write to
214    */
215   public void serialize(final Object obj, final ByteBuffer buffer) {
216     ByteBufferOutput out = new ByteBufferOutput(buffer);
217     Kryo kryo = borrow();
218     try {
219       kryo.writeClassAndObject(out, obj);
220       out.flush();
221     } finally {
222       release(kryo);
223     }
224   }
225
226   /**
227    * Serializes given object to OutputStream using Kryo instance in pool.
228    *
229    * @param obj    Object to serialize
230    * @param stream to write to
231    */
232   public void serialize(final Object obj, final OutputStream stream) {
233     serialize(obj, stream, DEFAULT_BUFFER_SIZE);
234   }
235
236   /**
237    * Serializes given object to OutputStream using Kryo instance in pool.
238    *
239    * @param obj        Object to serialize
240    * @param stream     to write to
241    * @param bufferSize size of the buffer in front of the stream
242    */
243   public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
244     ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
245     Kryo kryo = borrow();
246     try {
247       kryo.writeClassAndObject(out, obj);
248       out.flush();
249     } finally {
250       release(kryo);
251     }
252   }
253
254   /**
255    * Deserializes given byte array to Object using Kryo instance in pool.
256    *
257    * @param bytes serialized bytes
258    * @param <T>   deserialized Object type
259    * @return deserialized Object
260    */
261   public <T> T deserialize(final byte[] bytes) {
262     return kryoInputPool.run(input -> {
263       input.setInputStream(new ByteArrayInputStream(bytes));
264       return kryoPool.run(kryo -> {
265         @SuppressWarnings("unchecked")
266         T obj = (T) kryo.readClassAndObject(input);
267         return obj;
268       });
269     }, DEFAULT_BUFFER_SIZE);
270   }
271
272   /**
273    * Deserializes given byte buffer to Object using Kryo instance in pool.
274    *
275    * @param buffer input with serialized bytes
276    * @param <T>    deserialized Object type
277    * @return deserialized Object
278    */
279   public <T> T deserialize(final ByteBuffer buffer) {
280     ByteBufferInput in = new ByteBufferInput(buffer);
281     Kryo kryo = borrow();
282     try {
283       @SuppressWarnings("unchecked")
284       T obj = (T) kryo.readClassAndObject(in);
285       return obj;
286     } finally {
287       release(kryo);
288     }
289   }
290
291   /**
292    * Deserializes given InputStream to an Object using Kryo instance in pool.
293    *
294    * @param stream input stream
295    * @param <T>    deserialized Object type
296    * @return deserialized Object
297    */
298   public <T> T deserialize(final InputStream stream) {
299     return deserialize(stream, DEFAULT_BUFFER_SIZE);
300   }
301
302   /**
303    * Deserializes given InputStream to an Object using Kryo instance in pool.
304    *
305    * @param stream     input stream
306    * @param <T>        deserialized Object type
307    * @param bufferSize size of the buffer in front of the stream
308    * @return deserialized Object
309    */
310   public <T> T deserialize(final InputStream stream, final int bufferSize) {
311     ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
312     Kryo kryo = borrow();
313     try {
314       @SuppressWarnings("unchecked")
315       T obj = (T) kryo.readClassAndObject(in);
316       return obj;
317     } finally {
318       release(kryo);
319     }
320   }
321
322   /**
323    * Gets the number of classes registered in this Kryo namespace.
324    *
325    * @return size of namespace
326    */
327   public int size() {
328     return (int) registeredBlocks.stream()
329         .flatMap(block -> block.types().stream())
330         .count();
331   }
332
333   /**
334    * Creates a Kryo instance.
335    *
336    * @return Kryo instance
337    */
338   @Override
339   public Kryo create() {
340     LOGGER.trace("Creating Kryo instance for {}", this);
341     Kryo kryo = new Kryo();
342     kryo.setClassLoader(classLoader);
343     kryo.setRegistrationRequired(true);
344
345     // TODO rethink whether we want to use StdInstantiatorStrategy
346     kryo.setInstantiatorStrategy(
347         new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
348
349     for (RegistrationBlock block : registeredBlocks) {
350       int id = block.begin();
351       if (id == FLOATING_ID) {
352         id = kryo.getNextRegistrationId();
353       }
354       for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
355         register(kryo, entry.getKey(), entry.getValue(), id++);
356       }
357     }
358     return kryo;
359   }
360
361   /**
362    * Register {@code type} and {@code serializer} to {@code kryo} instance.
363    *
364    * @param kryo       Kryo instance
365    * @param types      types to register
366    * @param serializer Specific serializer to register or null to use default.
367    * @param id         type registration id to use
368    */
369   private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
370     Registration existing = kryo.getRegistration(id);
371     if (existing != null) {
372       boolean matches = false;
373       for (Class<?> type : types) {
374         if (existing.getType() == type) {
375           matches = true;
376           break;
377         }
378       }
379
380       if (!matches) {
381         LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
382             friendlyName, types, id, existing.getType());
383
384         throw new IllegalStateException(String.format(
385             "Failed to register %s as %s, %s was already registered.",
386             Arrays.toString(types), id, existing.getType()));
387       }
388       // falling through to register call for now.
389       // Consider skipping, if there's reasonable
390       // way to compare serializer equivalence.
391     }
392
393     for (Class<?> type : types) {
394       Registration r = null;
395       if (serializer == null) {
396         r = kryo.register(type, id);
397       } else if (type.isInterface()) {
398         kryo.addDefaultSerializer(type, serializer);
399       } else {
400         r = kryo.register(type, serializer, id);
401       }
402       if (r != null) {
403         if (r.getId() != id) {
404           LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
405               friendlyName, r.getType(), r.getId(), id);
406         }
407         LOGGER.trace("{} registered as {}", r.getType(), r.getId());
408       }
409     }
410   }
411
412   @Override
413   public Kryo borrow() {
414     return kryoPool.borrow();
415   }
416
417   @Override
418   public void release(Kryo kryo) {
419     kryoPool.release(kryo);
420   }
421
422   @Override
423   public <T> T run(KryoCallback<T> callback) {
424     return kryoPool.run(callback);
425   }
426
427   @Override
428   public String toString() {
429     if (!NO_NAME.equals(friendlyName)) {
430       return MoreObjects.toStringHelper(getClass())
431           .omitNullValues()
432           .add("friendlyName", friendlyName)
433           // omit lengthy detail, when there's a name
434           .toString();
435     }
436     return MoreObjects.toStringHelper(getClass())
437         .add("registeredBlocks", registeredBlocks)
438         .toString();
439   }
440
441   static final class RegistrationBlock {
442     private final int begin;
443     private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
444
445     RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
446       this.begin = begin;
447       this.types = ImmutableList.copyOf(types);
448     }
449
450     public int begin() {
451       return begin;
452     }
453
454     public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
455       return types;
456     }
457
458     @Override
459     public String toString() {
460       return MoreObjects.toStringHelper(getClass())
461           .add("begin", begin)
462           .add("types", types)
463           .toString();
464     }
465
466     @Override
467     public int hashCode() {
468       return types.hashCode();
469     }
470
471     // Only the registered types are used for equality.
472     @Override
473     public boolean equals(Object obj) {
474       if (this == obj) {
475         return true;
476       }
477
478       if (obj instanceof RegistrationBlock) {
479         RegistrationBlock that = (RegistrationBlock) obj;
480         return Objects.equals(this.types, that.types);
481       }
482       return false;
483     }
484   }
485 }