Do not leak Kryo from atomix.storage
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.utils.serializer;
18
19 import static java.util.Objects.requireNonNull;
20
21 import com.esotericsoftware.kryo.Kryo;
22 import com.esotericsoftware.kryo.Registration;
23 import com.esotericsoftware.kryo.Serializer;
24 import com.esotericsoftware.kryo.io.ByteBufferInput;
25 import com.esotericsoftware.kryo.io.ByteBufferOutput;
26 import com.esotericsoftware.kryo.pool.KryoCallback;
27 import com.esotericsoftware.kryo.pool.KryoFactory;
28 import com.esotericsoftware.kryo.pool.KryoPool;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.collect.ImmutableList;
31 import io.atomix.storage.journal.JournalSerdes;
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Objects;
42 import org.objenesis.strategy.StdInstantiatorStrategy;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Pool of Kryo instances, with classes pre-registered.
48  */
49 public final class Namespace implements JournalSerdes, KryoFactory, KryoPool {
50     /**
51      * Default buffer size used for serialization.
52      *
53      * @see #serialize(Object)
54      */
55     private static final int DEFAULT_BUFFER_SIZE = 4096;
56
57     /**
58      * ID to use if this KryoNamespace does not define registration id.
59      */
60     private static final int FLOATING_ID = -1;
61
62     /**
63      * Smallest ID free to use for user defined registrations.
64      */
65     private static final int INITIAL_ID = 16;
66
67     private static final String NO_NAME = "(no name)";
68
69     private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
70
71     private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
72
73     private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
74     private final KryoInputPool kryoInputPool = new KryoInputPool();
75
76     private final ImmutableList<RegistrationBlock> registeredBlocks;
77
78     private final ClassLoader classLoader;
79     private final String friendlyName;
80
81     /**
82      * KryoNamespace builder.
83      */
84     private static final class Builder implements JournalSerdes.Builder {
85         private final int blockHeadId = INITIAL_ID;
86         private final List<Entry<Class<?>[], EntrySerializer<?>>> types = new ArrayList<>();
87         private final List<RegistrationBlock> blocks = new ArrayList<>();
88         private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
89
90         @Override
91         public Builder register(final EntrySerdes<?> serdes, final Class<?>... classes) {
92             types.add(Map.entry(classes, new EntrySerializer<>(serdes)));
93             return this;
94         }
95
96         @Override
97         public Builder setClassLoader(final ClassLoader classLoader) {
98             this.classLoader = requireNonNull(classLoader);
99             return this;
100         }
101
102         @Override
103         public JournalSerdes build() {
104             return build(NO_NAME);
105         }
106
107         @Override
108         public JournalSerdes build(final String friendlyName) {
109             if (!types.isEmpty()) {
110                 blocks.add(new RegistrationBlock(blockHeadId, types));
111             }
112             return new Namespace(blocks, classLoader, friendlyName);
113         }
114     }
115
116     /**
117      * Creates a new {@link Namespace} builder.
118      *
119      * @return builder
120      */
121     public static JournalSerdes.Builder builder() {
122         return new Builder();
123     }
124
125     /**
126      * Creates a Kryo instance pool.
127      *
128      * @param registeredTypes      types to register
129      * @param registrationRequired whether registration is required
130      * @param friendlyName         friendly name for the namespace
131      */
132     private Namespace(
133         final List<RegistrationBlock> registeredTypes,
134         final ClassLoader classLoader,
135         final String friendlyName) {
136         registeredBlocks = ImmutableList.copyOf(registeredTypes);
137         this.classLoader = classLoader;
138         this.friendlyName = requireNonNull(friendlyName);
139
140         // Pre-populate with a single instance
141         release(create());
142     }
143
144     @Override
145     public byte[] serialize(final Object obj) {
146         return serialize(obj, DEFAULT_BUFFER_SIZE);
147     }
148
149     @Override
150     public byte[] serialize(final Object obj, final int bufferSize) {
151         return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
152             kryo.writeClassAndObject(output, obj);
153             output.flush();
154             return output.getByteArrayOutputStream().toByteArray();
155         }), bufferSize);
156     }
157
158     @Override
159     public void serialize(final Object obj, final ByteBuffer buffer) {
160         ByteBufferOutput out = new ByteBufferOutput(buffer);
161         Kryo kryo = borrow();
162         try {
163             kryo.writeClassAndObject(out, obj);
164             out.flush();
165         } finally {
166             release(kryo);
167         }
168     }
169
170     @Override
171     public void serialize(final Object obj, final OutputStream stream) {
172         serialize(obj, stream, DEFAULT_BUFFER_SIZE);
173     }
174
175     @Override
176     public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
177         ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
178         Kryo kryo = borrow();
179         try {
180             kryo.writeClassAndObject(out, obj);
181             out.flush();
182         } finally {
183             release(kryo);
184         }
185     }
186
187     @Override
188     public <T> T deserialize(final byte[] bytes) {
189         return kryoInputPool.run(input -> {
190             input.setInputStream(new ByteArrayInputStream(bytes));
191             return kryoPool.run(kryo -> {
192                 @SuppressWarnings("unchecked")
193                 T obj = (T) kryo.readClassAndObject(input);
194                 return obj;
195             });
196         }, DEFAULT_BUFFER_SIZE);
197     }
198
199     @Override
200     public <T> T deserialize(final ByteBuffer buffer) {
201         ByteBufferInput in = new ByteBufferInput(buffer);
202         Kryo kryo = borrow();
203         try {
204             @SuppressWarnings("unchecked")
205             T obj = (T) kryo.readClassAndObject(in);
206             return obj;
207         } finally {
208             release(kryo);
209         }
210     }
211
212     @Override
213     public <T> T deserialize(final InputStream stream) {
214         return deserialize(stream, DEFAULT_BUFFER_SIZE);
215     }
216
217     @Override
218     public <T> T deserialize(final InputStream stream, final int bufferSize) {
219         ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
220         Kryo kryo = borrow();
221         try {
222             @SuppressWarnings("unchecked")
223             T obj = (T) kryo.readClassAndObject(in);
224             return obj;
225         } finally {
226             release(kryo);
227         }
228     }
229
230     /**
231      * Creates a Kryo instance.
232      *
233      * @return Kryo instance
234      */
235     @Override
236     public Kryo create() {
237         LOGGER.trace("Creating Kryo instance for {}", this);
238         Kryo kryo = new Kryo();
239         kryo.setClassLoader(classLoader);
240         kryo.setRegistrationRequired(true);
241
242         // TODO rethink whether we want to use StdInstantiatorStrategy
243         kryo.setInstantiatorStrategy(
244             new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
245
246         for (RegistrationBlock block : registeredBlocks) {
247             int id = block.begin();
248             if (id == FLOATING_ID) {
249                 id = kryo.getNextRegistrationId();
250             }
251             for (Entry<Class<?>[], EntrySerializer<?>> entry : block.types()) {
252                 register(kryo, entry.getKey(), entry.getValue(), id++);
253             }
254         }
255         return kryo;
256     }
257
258     /**
259      * Register {@code type} and {@code serializer} to {@code kryo} instance.
260      *
261      * @param kryo       Kryo instance
262      * @param types      types to register
263      * @param serializer Specific serializer to register or null to use default.
264      * @param id         type registration id to use
265      */
266     private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
267         Registration existing = kryo.getRegistration(id);
268         if (existing != null) {
269             boolean matches = false;
270             for (Class<?> type : types) {
271                 if (existing.getType() == type) {
272                     matches = true;
273                     break;
274                 }
275             }
276
277             if (!matches) {
278                 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
279                     friendlyName, types, id, existing.getType());
280
281                 throw new IllegalStateException(String.format(
282                     "Failed to register %s as %s, %s was already registered.",
283                     Arrays.toString(types), id, existing.getType()));
284             }
285             // falling through to register call for now.
286             // Consider skipping, if there's reasonable
287             // way to compare serializer equivalence.
288         }
289
290         for (Class<?> type : types) {
291             Registration r = null;
292             if (serializer == null) {
293                 r = kryo.register(type, id);
294             } else if (type.isInterface()) {
295                 kryo.addDefaultSerializer(type, serializer);
296             } else {
297                 r = kryo.register(type, serializer, id);
298             }
299             if (r != null) {
300                 if (r.getId() != id) {
301                     LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
302                         friendlyName, r.getType(), r.getId(), id);
303                 }
304                 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
305             }
306         }
307     }
308
309     @Override
310     public Kryo borrow() {
311         return kryoPool.borrow();
312     }
313
314     @Override
315     public void release(final Kryo kryo) {
316         kryoPool.release(kryo);
317     }
318
319     @Override
320     public <T> T run(final KryoCallback<T> callback) {
321         return kryoPool.run(callback);
322     }
323
324     @Override
325     public String toString() {
326         if (!NO_NAME.equals(friendlyName)) {
327             return MoreObjects.toStringHelper(getClass())
328                 .omitNullValues()
329                 .add("friendlyName", friendlyName)
330                 // omit lengthy detail, when there's a name
331                 .toString();
332         }
333         return MoreObjects.toStringHelper(getClass()).add("registeredBlocks", registeredBlocks).toString();
334     }
335
336     static final class RegistrationBlock {
337         private final int begin;
338         private final ImmutableList<Entry<Class<?>[], EntrySerializer<?>>> types;
339
340         RegistrationBlock(final int begin, final List<Entry<Class<?>[], EntrySerializer<?>>> types) {
341             this.begin = begin;
342             this.types = ImmutableList.copyOf(types);
343         }
344
345         public int begin() {
346             return begin;
347         }
348
349         public ImmutableList<Entry<Class<?>[], EntrySerializer<?>>> types() {
350             return types;
351         }
352
353         @Override
354         public String toString() {
355             return MoreObjects.toStringHelper(getClass()).add("begin", begin).add("types", types).toString();
356         }
357
358         @Override
359         public int hashCode() {
360             return types.hashCode();
361         }
362
363         // Only the registered types are used for equality.
364         @Override
365         public boolean equals(final Object obj) {
366             if (this == obj) {
367                 return true;
368             }
369
370             if (obj instanceof RegistrationBlock that) {
371                 return Objects.equals(types, that.types);
372             }
373             return false;
374         }
375     }
376 }