Remove Namespace.MAX_BUFFER_SIZE
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.serializer;
17
18 import com.esotericsoftware.kryo.Kryo;
19 import com.esotericsoftware.kryo.Registration;
20 import com.esotericsoftware.kryo.Serializer;
21 import com.esotericsoftware.kryo.io.ByteBufferInput;
22 import com.esotericsoftware.kryo.io.ByteBufferOutput;
23 import com.esotericsoftware.kryo.pool.KryoCallback;
24 import com.esotericsoftware.kryo.pool.KryoFactory;
25 import com.esotericsoftware.kryo.pool.KryoPool;
26 import com.google.common.base.MoreObjects;
27 import com.google.common.collect.ImmutableList;
28 import org.objenesis.strategy.StdInstantiatorStrategy;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Objects;
42
43 import static java.util.Objects.requireNonNull;
44
45 /**
46  * Pool of Kryo instances, with classes pre-registered.
47  */
48 //@ThreadSafe
49 public final class Namespace implements KryoFactory, KryoPool {
50
51   /**
52    * Default buffer size used for serialization.
53    *
54    * @see #serialize(Object)
55    */
56   public static final int DEFAULT_BUFFER_SIZE = 4096;
57
58   /**
59    * ID to use if this KryoNamespace does not define registration id.
60    */
61   private static final int FLOATING_ID = -1;
62
63   /**
64    * Smallest ID free to use for user defined registrations.
65    */
66   private static final int INITIAL_ID = 16;
67
68   static final String NO_NAME = "(no name)";
69
70   private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
71
72   /**
73    * Default Kryo namespace.
74    */
75   public static final Namespace DEFAULT = builder().build();
76
77   private final KryoPool kryoPool = new KryoPool.Builder(this)
78       .softReferences()
79       .build();
80
81   private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
82   private final KryoInputPool kryoInputPool = new KryoInputPool();
83
84   private final ImmutableList<RegistrationBlock> registeredBlocks;
85
86   private final ClassLoader classLoader;
87   private final boolean registrationRequired;
88   private final String friendlyName;
89
90   /**
91    * KryoNamespace builder.
92    */
93   //@NotThreadSafe
94   public static final class Builder {
95     private int blockHeadId = INITIAL_ID;
96     private List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
97     private List<RegistrationBlock> blocks = new ArrayList<>();
98     private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
99     private boolean registrationRequired = true;
100
101     /**
102      * Builds a {@link Namespace} instance.
103      *
104      * @return KryoNamespace
105      */
106     public Namespace build() {
107       return build(NO_NAME);
108     }
109
110     /**
111      * Builds a {@link Namespace} instance.
112      *
113      * @param friendlyName friendly name for the namespace
114      * @return KryoNamespace
115      */
116     public Namespace build(String friendlyName) {
117       if (!types.isEmpty()) {
118         blocks.add(new RegistrationBlock(this.blockHeadId, types));
119       }
120       return new Namespace(blocks, classLoader, registrationRequired, friendlyName).populate(1);
121     }
122
123     /**
124      * Registers serializer for the given set of classes.
125      * <p>
126      * When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
127      * all instances will be serialized with the same type ID.
128      *
129      * @param classes    list of classes to register
130      * @param serializer serializer to use for the class
131      * @return this
132      */
133     public Builder register(Serializer<?> serializer, final Class<?>... classes) {
134       types.add(Map.entry(classes, serializer));
135       return this;
136     }
137
138     /**
139      * Sets the namespace class loader.
140      *
141      * @param classLoader the namespace class loader
142      * @return the namespace builder
143      */
144     public Builder setClassLoader(ClassLoader classLoader) {
145       this.classLoader = classLoader;
146       return this;
147     }
148
149     /**
150      * Sets the registrationRequired flag.
151      *
152      * @param registrationRequired Kryo's registrationRequired flag
153      * @return this
154      * @see Kryo#setRegistrationRequired(boolean)
155      */
156     public Builder setRegistrationRequired(boolean registrationRequired) {
157       this.registrationRequired = registrationRequired;
158       return this;
159     }
160   }
161
162   /**
163    * Creates a new {@link Namespace} builder.
164    *
165    * @return builder
166    */
167   public static Builder builder() {
168     return new Builder();
169   }
170
171   /**
172    * Creates a Kryo instance pool.
173    *
174    * @param registeredTypes      types to register
175    * @param registrationRequired whether registration is required
176    * @param friendlyName         friendly name for the namespace
177    */
178   private Namespace(
179       final List<RegistrationBlock> registeredTypes,
180       ClassLoader classLoader,
181       boolean registrationRequired,
182       String friendlyName) {
183     this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
184     this.registrationRequired = registrationRequired;
185     this.classLoader = classLoader;
186     this.friendlyName = requireNonNull(friendlyName);
187   }
188
189   /**
190    * Populates the Kryo pool.
191    *
192    * @param instances to add to the pool
193    * @return this
194    */
195   public Namespace populate(int instances) {
196
197     for (int i = 0; i < instances; ++i) {
198       release(create());
199     }
200     return this;
201   }
202
203   /**
204    * Serializes given object to byte array using Kryo instance in pool.
205    * <p>
206    * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
207    *
208    * @param obj Object to serialize
209    * @return serialized bytes
210    */
211   public byte[] serialize(final Object obj) {
212     return serialize(obj, DEFAULT_BUFFER_SIZE);
213   }
214
215   /**
216    * Serializes given object to byte array using Kryo instance in pool.
217    *
218    * @param obj        Object to serialize
219    * @param bufferSize maximum size of serialized bytes
220    * @return serialized bytes
221    */
222   public byte[] serialize(final Object obj, final int bufferSize) {
223     return kryoOutputPool.run(output -> {
224       return kryoPool.run(kryo -> {
225         kryo.writeClassAndObject(output, obj);
226         output.flush();
227         return output.getByteArrayOutputStream().toByteArray();
228       });
229     }, bufferSize);
230   }
231
232   /**
233    * Serializes given object to byte buffer using Kryo instance in pool.
234    *
235    * @param obj    Object to serialize
236    * @param buffer to write to
237    */
238   public void serialize(final Object obj, final ByteBuffer buffer) {
239     ByteBufferOutput out = new ByteBufferOutput(buffer);
240     Kryo kryo = borrow();
241     try {
242       kryo.writeClassAndObject(out, obj);
243       out.flush();
244     } finally {
245       release(kryo);
246     }
247   }
248
249   /**
250    * Serializes given object to OutputStream using Kryo instance in pool.
251    *
252    * @param obj    Object to serialize
253    * @param stream to write to
254    */
255   public void serialize(final Object obj, final OutputStream stream) {
256     serialize(obj, stream, DEFAULT_BUFFER_SIZE);
257   }
258
259   /**
260    * Serializes given object to OutputStream using Kryo instance in pool.
261    *
262    * @param obj        Object to serialize
263    * @param stream     to write to
264    * @param bufferSize size of the buffer in front of the stream
265    */
266   public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
267     ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
268     Kryo kryo = borrow();
269     try {
270       kryo.writeClassAndObject(out, obj);
271       out.flush();
272     } finally {
273       release(kryo);
274     }
275   }
276
277   /**
278    * Deserializes given byte array to Object using Kryo instance in pool.
279    *
280    * @param bytes serialized bytes
281    * @param <T>   deserialized Object type
282    * @return deserialized Object
283    */
284   public <T> T deserialize(final byte[] bytes) {
285     return kryoInputPool.run(input -> {
286       input.setInputStream(new ByteArrayInputStream(bytes));
287       return kryoPool.run(kryo -> {
288         @SuppressWarnings("unchecked")
289         T obj = (T) kryo.readClassAndObject(input);
290         return obj;
291       });
292     }, DEFAULT_BUFFER_SIZE);
293   }
294
295   /**
296    * Deserializes given byte buffer to Object using Kryo instance in pool.
297    *
298    * @param buffer input with serialized bytes
299    * @param <T>    deserialized Object type
300    * @return deserialized Object
301    */
302   public <T> T deserialize(final ByteBuffer buffer) {
303     ByteBufferInput in = new ByteBufferInput(buffer);
304     Kryo kryo = borrow();
305     try {
306       @SuppressWarnings("unchecked")
307       T obj = (T) kryo.readClassAndObject(in);
308       return obj;
309     } finally {
310       release(kryo);
311     }
312   }
313
314   /**
315    * Deserializes given InputStream to an Object using Kryo instance in pool.
316    *
317    * @param stream input stream
318    * @param <T>    deserialized Object type
319    * @return deserialized Object
320    */
321   public <T> T deserialize(final InputStream stream) {
322     return deserialize(stream, DEFAULT_BUFFER_SIZE);
323   }
324
325   /**
326    * Deserializes given InputStream to an Object using Kryo instance in pool.
327    *
328    * @param stream     input stream
329    * @param <T>        deserialized Object type
330    * @param bufferSize size of the buffer in front of the stream
331    * @return deserialized Object
332    */
333   public <T> T deserialize(final InputStream stream, final int bufferSize) {
334     ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
335     Kryo kryo = borrow();
336     try {
337       @SuppressWarnings("unchecked")
338       T obj = (T) kryo.readClassAndObject(in);
339       return obj;
340     } finally {
341       release(kryo);
342     }
343   }
344
345   private String friendlyName() {
346     return friendlyName;
347   }
348
349   /**
350    * Gets the number of classes registered in this Kryo namespace.
351    *
352    * @return size of namespace
353    */
354   public int size() {
355     return (int) registeredBlocks.stream()
356         .flatMap(block -> block.types().stream())
357         .count();
358   }
359
360   /**
361    * Creates a Kryo instance.
362    *
363    * @return Kryo instance
364    */
365   @Override
366   public Kryo create() {
367     LOGGER.trace("Creating Kryo instance for {}", this);
368     Kryo kryo = new Kryo();
369     kryo.setClassLoader(classLoader);
370     kryo.setRegistrationRequired(registrationRequired);
371
372     // TODO rethink whether we want to use StdInstantiatorStrategy
373     kryo.setInstantiatorStrategy(
374         new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
375
376     for (RegistrationBlock block : registeredBlocks) {
377       int id = block.begin();
378       if (id == FLOATING_ID) {
379         id = kryo.getNextRegistrationId();
380       }
381       for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
382         register(kryo, entry.getKey(), entry.getValue(), id++);
383       }
384     }
385     return kryo;
386   }
387
388   /**
389    * Register {@code type} and {@code serializer} to {@code kryo} instance.
390    *
391    * @param kryo       Kryo instance
392    * @param types      types to register
393    * @param serializer Specific serializer to register or null to use default.
394    * @param id         type registration id to use
395    */
396   private void register(Kryo kryo, Class<?>[] types, Serializer<?> serializer, int id) {
397     Registration existing = kryo.getRegistration(id);
398     if (existing != null) {
399       boolean matches = false;
400       for (Class<?> type : types) {
401         if (existing.getType() == type) {
402           matches = true;
403           break;
404         }
405       }
406
407       if (!matches) {
408         LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
409             friendlyName(), types, id, existing.getType());
410
411         throw new IllegalStateException(String.format(
412             "Failed to register %s as %s, %s was already registered.",
413             Arrays.toString(types), id, existing.getType()));
414       }
415       // falling through to register call for now.
416       // Consider skipping, if there's reasonable
417       // way to compare serializer equivalence.
418     }
419
420     for (Class<?> type : types) {
421       Registration r = null;
422       if (serializer == null) {
423         r = kryo.register(type, id);
424       } else if (type.isInterface()) {
425         kryo.addDefaultSerializer(type, serializer);
426       } else {
427         r = kryo.register(type, serializer, id);
428       }
429       if (r != null) {
430         if (r.getId() != id) {
431           LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
432               friendlyName(), r.getType(), r.getId(), id);
433         }
434         LOGGER.trace("{} registered as {}", r.getType(), r.getId());
435       }
436     }
437   }
438
439   @Override
440   public Kryo borrow() {
441     return kryoPool.borrow();
442   }
443
444   @Override
445   public void release(Kryo kryo) {
446     kryoPool.release(kryo);
447   }
448
449   @Override
450   public <T> T run(KryoCallback<T> callback) {
451     return kryoPool.run(callback);
452   }
453
454   @Override
455   public String toString() {
456     if (!NO_NAME.equals(friendlyName)) {
457       return MoreObjects.toStringHelper(getClass())
458           .omitNullValues()
459           .add("friendlyName", friendlyName)
460           // omit lengthy detail, when there's a name
461           .toString();
462     }
463     return MoreObjects.toStringHelper(getClass())
464         .add("registeredBlocks", registeredBlocks)
465         .toString();
466   }
467
468   static final class RegistrationBlock {
469     private final int begin;
470     private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
471
472     RegistrationBlock(int begin, List<Entry<Class<?>[], Serializer<?>>> types) {
473       this.begin = begin;
474       this.types = ImmutableList.copyOf(types);
475     }
476
477     public int begin() {
478       return begin;
479     }
480
481     public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
482       return types;
483     }
484
485     @Override
486     public String toString() {
487       return MoreObjects.toStringHelper(getClass())
488           .add("begin", begin)
489           .add("types", types)
490           .toString();
491     }
492
493     @Override
494     public int hashCode() {
495       return types.hashCode();
496     }
497
498     // Only the registered types are used for equality.
499     @Override
500     public boolean equals(Object obj) {
501       if (this == obj) {
502         return true;
503       }
504
505       if (obj instanceof RegistrationBlock) {
506         RegistrationBlock that = (RegistrationBlock) obj;
507         return Objects.equals(this.types, that.types);
508       }
509       return false;
510     }
511   }
512 }