866068b83e3a1d5c56b5a00a4195d194c315b89a
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.utils.serializer;
18
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
21
22 import com.esotericsoftware.kryo.Kryo;
23 import com.esotericsoftware.kryo.Registration;
24 import com.esotericsoftware.kryo.Serializer;
25 import com.esotericsoftware.kryo.io.ByteBufferInput;
26 import com.esotericsoftware.kryo.io.ByteBufferOutput;
27 import com.esotericsoftware.kryo.pool.KryoCallback;
28 import com.esotericsoftware.kryo.pool.KryoFactory;
29 import com.esotericsoftware.kryo.pool.KryoPool;
30 import com.google.common.base.MoreObjects;
31 import io.atomix.storage.journal.JournalSerdes;
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import org.objenesis.strategy.StdInstantiatorStrategy;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Pool of Kryo instances, with classes pre-registered.
47  */
48 public final class Namespace implements JournalSerdes, KryoFactory, KryoPool {
49     /**
50      * Default buffer size used for serialization.
51      *
52      * @see #serialize(Object)
53      */
54     private static final int DEFAULT_BUFFER_SIZE = 4096;
55
56     /**
57      * Smallest ID free to use for user defined registrations.
58      */
59     private static final int INITIAL_ID = 16;
60
61     private static final String NO_NAME = "(no name)";
62
63     private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
64
65     private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
66
67     private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
68     private final KryoInputPool kryoInputPool = new KryoInputPool();
69
70     private final List<Entry<Class<?>[], EntrySerializer<?>>> registeredTypes;
71
72     private final ClassLoader classLoader;
73     private final String friendlyName;
74
75     /**
76      * Creates a Kryo instance pool.
77      *
78      * @param registeredTypes      types to register
79      * @param registrationRequired whether registration is required
80      * @param friendlyName         friendly name for the namespace
81      */
82     private Namespace(
83             final List<Entry<Class<?>[], EntrySerializer<?>>> registeredTypes,
84             final ClassLoader classLoader,
85             final String friendlyName) {
86         this.registeredTypes = List.copyOf(registeredTypes);
87         this.classLoader = requireNonNull(classLoader);
88         this.friendlyName = requireNonNull(friendlyName);
89
90         // Pre-populate with a single instance
91         release(create());
92     }
93
94     /**
95      * Creates a new {@link Namespace} builder.
96      *
97      * @return builder
98      */
99     public static JournalSerdes.Builder builder() {
100         return new Builder();
101     }
102
103     @Override
104     public byte[] serialize(final Object obj) {
105         return serialize(obj, DEFAULT_BUFFER_SIZE);
106     }
107
108     @Override
109     public byte[] serialize(final Object obj, final int bufferSize) {
110         return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
111             kryo.writeClassAndObject(output, obj);
112             output.flush();
113             return output.getByteArrayOutputStream().toByteArray();
114         }), bufferSize);
115     }
116
117     @Override
118     public void serialize(final Object obj, final ByteBuffer buffer) {
119         ByteBufferOutput out = new ByteBufferOutput(buffer);
120         Kryo kryo = borrow();
121         try {
122             kryo.writeClassAndObject(out, obj);
123             out.flush();
124         } finally {
125             release(kryo);
126         }
127     }
128
129     @Override
130     public void serialize(final Object obj, final OutputStream stream) {
131         serialize(obj, stream, DEFAULT_BUFFER_SIZE);
132     }
133
134     @Override
135     public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
136         ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
137         Kryo kryo = borrow();
138         try {
139             kryo.writeClassAndObject(out, obj);
140             out.flush();
141         } finally {
142             release(kryo);
143         }
144     }
145
146     @Override
147     public <T> T deserialize(final byte[] bytes) {
148         return kryoInputPool.run(input -> {
149             input.setInputStream(new ByteArrayInputStream(bytes));
150             return kryoPool.run(kryo -> {
151                 @SuppressWarnings("unchecked")
152                 T obj = (T) kryo.readClassAndObject(input);
153                 return obj;
154             });
155         }, DEFAULT_BUFFER_SIZE);
156     }
157
158     @Override
159     public <T> T deserialize(final ByteBuffer buffer) {
160         ByteBufferInput in = new ByteBufferInput(buffer);
161         Kryo kryo = borrow();
162         try {
163             @SuppressWarnings("unchecked")
164             T obj = (T) kryo.readClassAndObject(in);
165             return obj;
166         } finally {
167             release(kryo);
168         }
169     }
170
171     @Override
172     public <T> T deserialize(final InputStream stream) {
173         return deserialize(stream, DEFAULT_BUFFER_SIZE);
174     }
175
176     @Override
177     public <T> T deserialize(final InputStream stream, final int bufferSize) {
178         ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
179         Kryo kryo = borrow();
180         try {
181             @SuppressWarnings("unchecked")
182             T obj = (T) kryo.readClassAndObject(in);
183             return obj;
184         } finally {
185             release(kryo);
186         }
187     }
188
189     /**
190      * Creates a Kryo instance.
191      *
192      * @return Kryo instance
193      */
194     @Override
195     public Kryo create() {
196         LOGGER.trace("Creating Kryo instance for {}", this);
197         Kryo kryo = new Kryo();
198         kryo.setClassLoader(classLoader);
199         kryo.setRegistrationRequired(true);
200
201         // TODO rethink whether we want to use StdInstantiatorStrategy
202         kryo.setInstantiatorStrategy(
203             new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
204
205         int id = INITIAL_ID;
206         for (Entry<Class<?>[], EntrySerializer<?>> entry : registeredTypes) {
207             register(kryo, entry.getKey(), entry.getValue(), id++);
208         }
209         return kryo;
210     }
211
212     /**
213      * Register {@code type} and {@code serializer} to {@code kryo} instance.
214      *
215      * @param kryo       Kryo instance
216      * @param types      types to register
217      * @param serializer Specific serializer to register or null to use default.
218      * @param id         type registration id to use
219      */
220     private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
221         Registration existing = kryo.getRegistration(id);
222         if (existing != null) {
223             boolean matches = false;
224             for (Class<?> type : types) {
225                 if (existing.getType() == type) {
226                     matches = true;
227                     break;
228                 }
229             }
230
231             if (!matches) {
232                 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
233                     friendlyName, types, id, existing.getType());
234
235                 throw new IllegalStateException(String.format(
236                     "Failed to register %s as %s, %s was already registered.",
237                     Arrays.toString(types), id, existing.getType()));
238             }
239             // falling through to register call for now.
240             // Consider skipping, if there's reasonable
241             // way to compare serializer equivalence.
242         }
243
244         for (Class<?> type : types) {
245             Registration r = null;
246             if (serializer == null) {
247                 r = kryo.register(type, id);
248             } else if (type.isInterface()) {
249                 kryo.addDefaultSerializer(type, serializer);
250             } else {
251                 r = kryo.register(type, serializer, id);
252             }
253             if (r != null) {
254                 if (r.getId() != id) {
255                     LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
256                         friendlyName, r.getType(), r.getId(), id);
257                 }
258                 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
259             }
260         }
261     }
262
263     @Override
264     public Kryo borrow() {
265         return kryoPool.borrow();
266     }
267
268     @Override
269     public void release(final Kryo kryo) {
270         kryoPool.release(kryo);
271     }
272
273     @Override
274     public <T> T run(final KryoCallback<T> callback) {
275         return kryoPool.run(callback);
276     }
277
278     @Override
279     public String toString() {
280         if (!NO_NAME.equals(friendlyName)) {
281             return MoreObjects.toStringHelper(getClass())
282                 .omitNullValues()
283                 .add("friendlyName", friendlyName)
284                 // omit lengthy detail, when there's a name
285                 .toString();
286         }
287         return MoreObjects.toStringHelper(getClass()).add("registeredTypes", registeredTypes).toString();
288     }
289
290     private static final class Builder implements JournalSerdes.Builder {
291         private final List<Entry<Class<?>[], EntrySerializer<?>>> types = new ArrayList<>();
292         private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
293
294         @Override
295         public Builder register(final EntrySerdes<?> serdes, final Class<?>... classes) {
296             types.add(Map.entry(classes, new EntrySerializer<>(serdes)));
297             return this;
298         }
299
300         @Override
301         public Builder setClassLoader(final ClassLoader classLoader) {
302             this.classLoader = requireNonNull(classLoader);
303             return this;
304         }
305
306         @Override
307         public JournalSerdes build() {
308             return build(NO_NAME);
309         }
310
311         @Override
312         public JournalSerdes build(final String friendlyName) {
313             checkState(!types.isEmpty(), "No serializers registered");
314             return new Namespace(types, classLoader, friendlyName);
315         }
316     }
317 }