Add RegisteredType
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.utils.serializer;
18
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
21
22 import com.esotericsoftware.kryo.Kryo;
23 import com.esotericsoftware.kryo.Registration;
24 import com.esotericsoftware.kryo.Serializer;
25 import com.esotericsoftware.kryo.io.ByteBufferInput;
26 import com.esotericsoftware.kryo.io.ByteBufferOutput;
27 import com.esotericsoftware.kryo.pool.KryoCallback;
28 import com.esotericsoftware.kryo.pool.KryoFactory;
29 import com.esotericsoftware.kryo.pool.KryoPool;
30 import com.google.common.base.MoreObjects;
31 import io.atomix.storage.journal.JournalSerdes;
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import org.objenesis.strategy.StdInstantiatorStrategy;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * Pool of Kryo instances, with classes pre-registered.
45  */
46 public final class Namespace implements JournalSerdes, KryoFactory, KryoPool {
47     /**
48      * Default buffer size used for serialization.
49      *
50      * @see #serialize(Object)
51      */
52     private static final int DEFAULT_BUFFER_SIZE = 4096;
53
54     /**
55      * Smallest ID free to use for user defined registrations.
56      */
57     private static final int INITIAL_ID = 16;
58
59     private static final String NO_NAME = "(no name)";
60
61     private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
62
63     private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
64
65     private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
66     private final KryoInputPool kryoInputPool = new KryoInputPool();
67
68     private final List<RegisteredType> registeredTypes;
69     private final ClassLoader classLoader;
70     private final String friendlyName;
71
72     /**
73      * Creates a Kryo instance pool.
74      *
75      * @param registeredTypes      types to register
76      * @param registrationRequired whether registration is required
77      * @param friendlyName         friendly name for the namespace
78      */
79     private Namespace(
80             final List<RegisteredType> registeredTypes,
81             final ClassLoader classLoader,
82             final String friendlyName) {
83         this.registeredTypes = List.copyOf(registeredTypes);
84         this.classLoader = requireNonNull(classLoader);
85         this.friendlyName = requireNonNull(friendlyName);
86
87         // Pre-populate with a single instance
88         release(create());
89     }
90
91     /**
92      * Creates a new {@link Namespace} builder.
93      *
94      * @return builder
95      */
96     public static JournalSerdes.Builder builder() {
97         return new Builder();
98     }
99
100     @Override
101     public byte[] serialize(final Object obj) {
102         return serialize(obj, DEFAULT_BUFFER_SIZE);
103     }
104
105     @Override
106     public byte[] serialize(final Object obj, final int bufferSize) {
107         return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
108             kryo.writeClassAndObject(output, obj);
109             output.flush();
110             return output.getByteArrayOutputStream().toByteArray();
111         }), bufferSize);
112     }
113
114     @Override
115     public void serialize(final Object obj, final ByteBuffer buffer) {
116         ByteBufferOutput out = new ByteBufferOutput(buffer);
117         Kryo kryo = borrow();
118         try {
119             kryo.writeClassAndObject(out, obj);
120             out.flush();
121         } finally {
122             release(kryo);
123         }
124     }
125
126     @Override
127     public void serialize(final Object obj, final OutputStream stream) {
128         serialize(obj, stream, DEFAULT_BUFFER_SIZE);
129     }
130
131     @Override
132     public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
133         ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
134         Kryo kryo = borrow();
135         try {
136             kryo.writeClassAndObject(out, obj);
137             out.flush();
138         } finally {
139             release(kryo);
140         }
141     }
142
143     @Override
144     public <T> T deserialize(final byte[] bytes) {
145         return kryoInputPool.run(input -> {
146             input.setInputStream(new ByteArrayInputStream(bytes));
147             return kryoPool.run(kryo -> {
148                 @SuppressWarnings("unchecked")
149                 T obj = (T) kryo.readClassAndObject(input);
150                 return obj;
151             });
152         }, DEFAULT_BUFFER_SIZE);
153     }
154
155     @Override
156     public <T> T deserialize(final ByteBuffer buffer) {
157         ByteBufferInput in = new ByteBufferInput(buffer);
158         Kryo kryo = borrow();
159         try {
160             @SuppressWarnings("unchecked")
161             T obj = (T) kryo.readClassAndObject(in);
162             return obj;
163         } finally {
164             release(kryo);
165         }
166     }
167
168     @Override
169     public <T> T deserialize(final InputStream stream) {
170         return deserialize(stream, DEFAULT_BUFFER_SIZE);
171     }
172
173     @Override
174     public <T> T deserialize(final InputStream stream, final int bufferSize) {
175         ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
176         Kryo kryo = borrow();
177         try {
178             @SuppressWarnings("unchecked")
179             T obj = (T) kryo.readClassAndObject(in);
180             return obj;
181         } finally {
182             release(kryo);
183         }
184     }
185
186     /**
187      * Creates a Kryo instance.
188      *
189      * @return Kryo instance
190      */
191     @Override
192     public Kryo create() {
193         LOGGER.trace("Creating Kryo instance for {}", this);
194         Kryo kryo = new Kryo();
195         kryo.setClassLoader(classLoader);
196         kryo.setRegistrationRequired(true);
197
198         // TODO rethink whether we want to use StdInstantiatorStrategy
199         kryo.setInstantiatorStrategy(
200             new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
201
202         int id = INITIAL_ID;
203         for (RegisteredType registeredType : registeredTypes) {
204             register(kryo, registeredType.types(), registeredType.serializer(), id++);
205         }
206         return kryo;
207     }
208
209     /**
210      * Register {@code type} and {@code serializer} to {@code kryo} instance.
211      *
212      * @param kryo       Kryo instance
213      * @param types      types to register
214      * @param serializer Specific serializer to register or null to use default.
215      * @param id         type registration id to use
216      */
217     private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
218         Registration existing = kryo.getRegistration(id);
219         if (existing != null) {
220             boolean matches = false;
221             for (Class<?> type : types) {
222                 if (existing.getType() == type) {
223                     matches = true;
224                     break;
225                 }
226             }
227
228             if (!matches) {
229                 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
230                     friendlyName, types, id, existing.getType());
231
232                 throw new IllegalStateException(String.format(
233                     "Failed to register %s as %s, %s was already registered.",
234                     Arrays.toString(types), id, existing.getType()));
235             }
236             // falling through to register call for now.
237             // Consider skipping, if there's reasonable
238             // way to compare serializer equivalence.
239         }
240
241         for (Class<?> type : types) {
242             Registration r = null;
243             if (serializer == null) {
244                 r = kryo.register(type, id);
245             } else if (type.isInterface()) {
246                 kryo.addDefaultSerializer(type, serializer);
247             } else {
248                 r = kryo.register(type, serializer, id);
249             }
250             if (r != null) {
251                 if (r.getId() != id) {
252                     LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
253                         friendlyName, r.getType(), r.getId(), id);
254                 }
255                 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
256             }
257         }
258     }
259
260     @Override
261     public Kryo borrow() {
262         return kryoPool.borrow();
263     }
264
265     @Override
266     public void release(final Kryo kryo) {
267         kryoPool.release(kryo);
268     }
269
270     @Override
271     public <T> T run(final KryoCallback<T> callback) {
272         return kryoPool.run(callback);
273     }
274
275     @Override
276     public String toString() {
277         if (!NO_NAME.equals(friendlyName)) {
278             return MoreObjects.toStringHelper(getClass())
279                 .omitNullValues()
280                 .add("friendlyName", friendlyName)
281                 // omit lengthy detail, when there's a name
282                 .toString();
283         }
284         return MoreObjects.toStringHelper(getClass()).add("registeredTypes", registeredTypes).toString();
285     }
286
287     private static record RegisteredType(EntrySerializer<?> serializer, Class<?>[] types) {
288         RegisteredType {
289             requireNonNull(serializer);
290             requireNonNull(types);
291         }
292     }
293
294     private static final class Builder implements JournalSerdes.Builder {
295         private final List<RegisteredType> types = new ArrayList<>();
296         private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
297
298         @Override
299         public Builder register(final EntrySerdes<?> serdes, final Class<?>... classes) {
300             types.add(new RegisteredType(new EntrySerializer<>(serdes), classes));
301             return this;
302         }
303
304         @Override
305         public Builder setClassLoader(final ClassLoader classLoader) {
306             this.classLoader = requireNonNull(classLoader);
307             return this;
308         }
309
310         @Override
311         public JournalSerdes build() {
312             return build(NO_NAME);
313         }
314
315         @Override
316         public JournalSerdes build(final String friendlyName) {
317             checkState(!types.isEmpty(), "No serializers registered");
318             return new Namespace(types, classLoader, friendlyName);
319         }
320     }
321 }