Remove Namespace.FLOATING_ID
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / Namespace.java
1 /*
2  * Copyright 2014-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.utils.serializer;
18
19 import static java.util.Objects.requireNonNull;
20
21 import com.esotericsoftware.kryo.Kryo;
22 import com.esotericsoftware.kryo.Registration;
23 import com.esotericsoftware.kryo.Serializer;
24 import com.esotericsoftware.kryo.io.ByteBufferInput;
25 import com.esotericsoftware.kryo.io.ByteBufferOutput;
26 import com.esotericsoftware.kryo.pool.KryoCallback;
27 import com.esotericsoftware.kryo.pool.KryoFactory;
28 import com.esotericsoftware.kryo.pool.KryoPool;
29 import com.google.common.base.MoreObjects;
30 import com.google.common.collect.ImmutableList;
31 import io.atomix.storage.journal.JournalSerdes;
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Objects;
42 import org.objenesis.strategy.StdInstantiatorStrategy;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Pool of Kryo instances, with classes pre-registered.
48  */
49 public final class Namespace implements JournalSerdes, KryoFactory, KryoPool {
50     /**
51      * Default buffer size used for serialization.
52      *
53      * @see #serialize(Object)
54      */
55     private static final int DEFAULT_BUFFER_SIZE = 4096;
56
57     /**
58      * Smallest ID free to use for user defined registrations.
59      */
60     private static final int INITIAL_ID = 16;
61
62     private static final String NO_NAME = "(no name)";
63
64     private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
65
66     private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
67
68     private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
69     private final KryoInputPool kryoInputPool = new KryoInputPool();
70
71     private final ImmutableList<RegistrationBlock> registeredBlocks;
72
73     private final ClassLoader classLoader;
74     private final String friendlyName;
75
76     /**
77      * KryoNamespace builder.
78      */
79     private static final class Builder implements JournalSerdes.Builder {
80         private final int blockHeadId = INITIAL_ID;
81         private final List<Entry<Class<?>[], EntrySerializer<?>>> types = new ArrayList<>();
82         private final List<RegistrationBlock> blocks = new ArrayList<>();
83         private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
84
85         @Override
86         public Builder register(final EntrySerdes<?> serdes, final Class<?>... classes) {
87             types.add(Map.entry(classes, new EntrySerializer<>(serdes)));
88             return this;
89         }
90
91         @Override
92         public Builder setClassLoader(final ClassLoader classLoader) {
93             this.classLoader = requireNonNull(classLoader);
94             return this;
95         }
96
97         @Override
98         public JournalSerdes build() {
99             return build(NO_NAME);
100         }
101
102         @Override
103         public JournalSerdes build(final String friendlyName) {
104             if (!types.isEmpty()) {
105                 blocks.add(new RegistrationBlock(blockHeadId, types));
106             }
107             return new Namespace(blocks, classLoader, friendlyName);
108         }
109     }
110
111     /**
112      * Creates a new {@link Namespace} builder.
113      *
114      * @return builder
115      */
116     public static JournalSerdes.Builder builder() {
117         return new Builder();
118     }
119
120     /**
121      * Creates a Kryo instance pool.
122      *
123      * @param registeredTypes      types to register
124      * @param registrationRequired whether registration is required
125      * @param friendlyName         friendly name for the namespace
126      */
127     private Namespace(
128         final List<RegistrationBlock> registeredTypes,
129         final ClassLoader classLoader,
130         final String friendlyName) {
131         registeredBlocks = ImmutableList.copyOf(registeredTypes);
132         this.classLoader = classLoader;
133         this.friendlyName = requireNonNull(friendlyName);
134
135         // Pre-populate with a single instance
136         release(create());
137     }
138
139     @Override
140     public byte[] serialize(final Object obj) {
141         return serialize(obj, DEFAULT_BUFFER_SIZE);
142     }
143
144     @Override
145     public byte[] serialize(final Object obj, final int bufferSize) {
146         return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
147             kryo.writeClassAndObject(output, obj);
148             output.flush();
149             return output.getByteArrayOutputStream().toByteArray();
150         }), bufferSize);
151     }
152
153     @Override
154     public void serialize(final Object obj, final ByteBuffer buffer) {
155         ByteBufferOutput out = new ByteBufferOutput(buffer);
156         Kryo kryo = borrow();
157         try {
158             kryo.writeClassAndObject(out, obj);
159             out.flush();
160         } finally {
161             release(kryo);
162         }
163     }
164
165     @Override
166     public void serialize(final Object obj, final OutputStream stream) {
167         serialize(obj, stream, DEFAULT_BUFFER_SIZE);
168     }
169
170     @Override
171     public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
172         ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
173         Kryo kryo = borrow();
174         try {
175             kryo.writeClassAndObject(out, obj);
176             out.flush();
177         } finally {
178             release(kryo);
179         }
180     }
181
182     @Override
183     public <T> T deserialize(final byte[] bytes) {
184         return kryoInputPool.run(input -> {
185             input.setInputStream(new ByteArrayInputStream(bytes));
186             return kryoPool.run(kryo -> {
187                 @SuppressWarnings("unchecked")
188                 T obj = (T) kryo.readClassAndObject(input);
189                 return obj;
190             });
191         }, DEFAULT_BUFFER_SIZE);
192     }
193
194     @Override
195     public <T> T deserialize(final ByteBuffer buffer) {
196         ByteBufferInput in = new ByteBufferInput(buffer);
197         Kryo kryo = borrow();
198         try {
199             @SuppressWarnings("unchecked")
200             T obj = (T) kryo.readClassAndObject(in);
201             return obj;
202         } finally {
203             release(kryo);
204         }
205     }
206
207     @Override
208     public <T> T deserialize(final InputStream stream) {
209         return deserialize(stream, DEFAULT_BUFFER_SIZE);
210     }
211
212     @Override
213     public <T> T deserialize(final InputStream stream, final int bufferSize) {
214         ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
215         Kryo kryo = borrow();
216         try {
217             @SuppressWarnings("unchecked")
218             T obj = (T) kryo.readClassAndObject(in);
219             return obj;
220         } finally {
221             release(kryo);
222         }
223     }
224
225     /**
226      * Creates a Kryo instance.
227      *
228      * @return Kryo instance
229      */
230     @Override
231     public Kryo create() {
232         LOGGER.trace("Creating Kryo instance for {}", this);
233         Kryo kryo = new Kryo();
234         kryo.setClassLoader(classLoader);
235         kryo.setRegistrationRequired(true);
236
237         // TODO rethink whether we want to use StdInstantiatorStrategy
238         kryo.setInstantiatorStrategy(
239             new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
240
241         for (RegistrationBlock block : registeredBlocks) {
242             int id = block.begin();
243             for (Entry<Class<?>[], EntrySerializer<?>> entry : block.types()) {
244                 register(kryo, entry.getKey(), entry.getValue(), id++);
245             }
246         }
247         return kryo;
248     }
249
250     /**
251      * Register {@code type} and {@code serializer} to {@code kryo} instance.
252      *
253      * @param kryo       Kryo instance
254      * @param types      types to register
255      * @param serializer Specific serializer to register or null to use default.
256      * @param id         type registration id to use
257      */
258     private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
259         Registration existing = kryo.getRegistration(id);
260         if (existing != null) {
261             boolean matches = false;
262             for (Class<?> type : types) {
263                 if (existing.getType() == type) {
264                     matches = true;
265                     break;
266                 }
267             }
268
269             if (!matches) {
270                 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
271                     friendlyName, types, id, existing.getType());
272
273                 throw new IllegalStateException(String.format(
274                     "Failed to register %s as %s, %s was already registered.",
275                     Arrays.toString(types), id, existing.getType()));
276             }
277             // falling through to register call for now.
278             // Consider skipping, if there's reasonable
279             // way to compare serializer equivalence.
280         }
281
282         for (Class<?> type : types) {
283             Registration r = null;
284             if (serializer == null) {
285                 r = kryo.register(type, id);
286             } else if (type.isInterface()) {
287                 kryo.addDefaultSerializer(type, serializer);
288             } else {
289                 r = kryo.register(type, serializer, id);
290             }
291             if (r != null) {
292                 if (r.getId() != id) {
293                     LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
294                         friendlyName, r.getType(), r.getId(), id);
295                 }
296                 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
297             }
298         }
299     }
300
301     @Override
302     public Kryo borrow() {
303         return kryoPool.borrow();
304     }
305
306     @Override
307     public void release(final Kryo kryo) {
308         kryoPool.release(kryo);
309     }
310
311     @Override
312     public <T> T run(final KryoCallback<T> callback) {
313         return kryoPool.run(callback);
314     }
315
316     @Override
317     public String toString() {
318         if (!NO_NAME.equals(friendlyName)) {
319             return MoreObjects.toStringHelper(getClass())
320                 .omitNullValues()
321                 .add("friendlyName", friendlyName)
322                 // omit lengthy detail, when there's a name
323                 .toString();
324         }
325         return MoreObjects.toStringHelper(getClass()).add("registeredBlocks", registeredBlocks).toString();
326     }
327
328     static final class RegistrationBlock {
329         private final int begin;
330         private final ImmutableList<Entry<Class<?>[], EntrySerializer<?>>> types;
331
332         RegistrationBlock(final int begin, final List<Entry<Class<?>[], EntrySerializer<?>>> types) {
333             this.begin = begin;
334             this.types = ImmutableList.copyOf(types);
335         }
336
337         public int begin() {
338             return begin;
339         }
340
341         public ImmutableList<Entry<Class<?>[], EntrySerializer<?>>> types() {
342             return types;
343         }
344
345         @Override
346         public String toString() {
347             return MoreObjects.toStringHelper(getClass()).add("begin", begin).add("types", types).toString();
348         }
349
350         @Override
351         public int hashCode() {
352             return types.hashCode();
353         }
354
355         // Only the registered types are used for equality.
356         @Override
357         public boolean equals(final Object obj) {
358             if (this == obj) {
359                 return true;
360             }
361
362             if (obj instanceof RegistrationBlock that) {
363                 return Objects.equals(types, that.types);
364             }
365             return false;
366         }
367     }
368 }