Rename Namespace to KryoJournalSerdes
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / utils / serializer / KryoJournalSerdes.java
1 /*
2  * Copyright 2014-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.utils.serializer;
18
19 import static java.util.Objects.requireNonNull;
20
21 import com.esotericsoftware.kryo.Kryo;
22 import com.esotericsoftware.kryo.Registration;
23 import com.esotericsoftware.kryo.Serializer;
24 import com.esotericsoftware.kryo.io.ByteBufferInput;
25 import com.esotericsoftware.kryo.io.ByteBufferOutput;
26 import com.esotericsoftware.kryo.pool.KryoCallback;
27 import com.esotericsoftware.kryo.pool.KryoFactory;
28 import com.esotericsoftware.kryo.pool.KryoPool;
29 import com.google.common.base.MoreObjects;
30 import io.atomix.storage.journal.JournalSerdes;
31 import java.io.ByteArrayInputStream;
32 import java.io.InputStream;
33 import java.io.OutputStream;
34 import java.nio.ByteBuffer;
35 import java.util.Arrays;
36 import java.util.List;
37 import org.objenesis.strategy.StdInstantiatorStrategy;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Pool of Kryo instances, with classes pre-registered.
43  */
44 final class KryoJournalSerdes implements JournalSerdes, KryoFactory, KryoPool {
45     /**
46      * Default buffer size used for serialization.
47      *
48      * @see #serialize(Object)
49      */
50     private static final int DEFAULT_BUFFER_SIZE = 4096;
51
52     /**
53      * Smallest ID free to use for user defined registrations.
54      */
55     private static final int INITIAL_ID = 16;
56
57     static final String NO_NAME = "(no name)";
58
59     private static final Logger LOGGER = LoggerFactory.getLogger(KryoJournalSerdes.class);
60
61     private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
62
63     private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
64     private final KryoInputPool kryoInputPool = new KryoInputPool();
65
66     private final List<RegisteredType> registeredTypes;
67     private final ClassLoader classLoader;
68     private final String friendlyName;
69
70     /**
71      * Creates a Kryo instance pool.
72      *
73      * @param registeredTypes      types to register
74      * @param registrationRequired whether registration is required
75      * @param friendlyName         friendly name for the namespace
76      */
77     KryoJournalSerdes(
78             final List<RegisteredType> registeredTypes,
79             final ClassLoader classLoader,
80             final String friendlyName) {
81         this.registeredTypes = List.copyOf(registeredTypes);
82         this.classLoader = requireNonNull(classLoader);
83         this.friendlyName = requireNonNull(friendlyName);
84
85         // Pre-populate with a single instance
86         release(create());
87     }
88
89     @Override
90     public byte[] serialize(final Object obj) {
91         return serialize(obj, DEFAULT_BUFFER_SIZE);
92     }
93
94     @Override
95     public byte[] serialize(final Object obj, final int bufferSize) {
96         return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
97             kryo.writeClassAndObject(output, obj);
98             output.flush();
99             return output.getByteArrayOutputStream().toByteArray();
100         }), bufferSize);
101     }
102
103     @Override
104     public void serialize(final Object obj, final ByteBuffer buffer) {
105         ByteBufferOutput out = new ByteBufferOutput(buffer);
106         Kryo kryo = borrow();
107         try {
108             kryo.writeClassAndObject(out, obj);
109             out.flush();
110         } finally {
111             release(kryo);
112         }
113     }
114
115     @Override
116     public void serialize(final Object obj, final OutputStream stream) {
117         serialize(obj, stream, DEFAULT_BUFFER_SIZE);
118     }
119
120     @Override
121     public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
122         ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
123         Kryo kryo = borrow();
124         try {
125             kryo.writeClassAndObject(out, obj);
126             out.flush();
127         } finally {
128             release(kryo);
129         }
130     }
131
132     @Override
133     public <T> T deserialize(final byte[] bytes) {
134         return kryoInputPool.run(input -> {
135             input.setInputStream(new ByteArrayInputStream(bytes));
136             return kryoPool.run(kryo -> {
137                 @SuppressWarnings("unchecked")
138                 T obj = (T) kryo.readClassAndObject(input);
139                 return obj;
140             });
141         }, DEFAULT_BUFFER_SIZE);
142     }
143
144     @Override
145     public <T> T deserialize(final ByteBuffer buffer) {
146         ByteBufferInput in = new ByteBufferInput(buffer);
147         Kryo kryo = borrow();
148         try {
149             @SuppressWarnings("unchecked")
150             T obj = (T) kryo.readClassAndObject(in);
151             return obj;
152         } finally {
153             release(kryo);
154         }
155     }
156
157     @Override
158     public <T> T deserialize(final InputStream stream) {
159         return deserialize(stream, DEFAULT_BUFFER_SIZE);
160     }
161
162     @Override
163     public <T> T deserialize(final InputStream stream, final int bufferSize) {
164         ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
165         Kryo kryo = borrow();
166         try {
167             @SuppressWarnings("unchecked")
168             T obj = (T) kryo.readClassAndObject(in);
169             return obj;
170         } finally {
171             release(kryo);
172         }
173     }
174
175     /**
176      * Creates a Kryo instance.
177      *
178      * @return Kryo instance
179      */
180     @Override
181     public Kryo create() {
182         LOGGER.trace("Creating Kryo instance for {}", this);
183         Kryo kryo = new Kryo();
184         kryo.setClassLoader(classLoader);
185         kryo.setRegistrationRequired(true);
186
187         // TODO rethink whether we want to use StdInstantiatorStrategy
188         kryo.setInstantiatorStrategy(
189             new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
190
191         int id = INITIAL_ID;
192         for (RegisteredType registeredType : registeredTypes) {
193             register(kryo, registeredType.types(), registeredType.serializer(), id++);
194         }
195         return kryo;
196     }
197
198     /**
199      * Register {@code type} and {@code serializer} to {@code kryo} instance.
200      *
201      * @param kryo       Kryo instance
202      * @param types      types to register
203      * @param serializer Specific serializer to register or null to use default.
204      * @param id         type registration id to use
205      */
206     private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
207         Registration existing = kryo.getRegistration(id);
208         if (existing != null) {
209             boolean matches = false;
210             for (Class<?> type : types) {
211                 if (existing.getType() == type) {
212                     matches = true;
213                     break;
214                 }
215             }
216
217             if (!matches) {
218                 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
219                     friendlyName, types, id, existing.getType());
220
221                 throw new IllegalStateException(String.format(
222                     "Failed to register %s as %s, %s was already registered.",
223                     Arrays.toString(types), id, existing.getType()));
224             }
225             // falling through to register call for now.
226             // Consider skipping, if there's reasonable
227             // way to compare serializer equivalence.
228         }
229
230         for (Class<?> type : types) {
231             Registration r = null;
232             if (serializer == null) {
233                 r = kryo.register(type, id);
234             } else if (type.isInterface()) {
235                 kryo.addDefaultSerializer(type, serializer);
236             } else {
237                 r = kryo.register(type, serializer, id);
238             }
239             if (r != null) {
240                 if (r.getId() != id) {
241                     LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
242                         friendlyName, r.getType(), r.getId(), id);
243                 }
244                 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
245             }
246         }
247     }
248
249     @Override
250     public Kryo borrow() {
251         return kryoPool.borrow();
252     }
253
254     @Override
255     public void release(final Kryo kryo) {
256         kryoPool.release(kryo);
257     }
258
259     @Override
260     public <T> T run(final KryoCallback<T> callback) {
261         return kryoPool.run(callback);
262     }
263
264     @Override
265     public String toString() {
266         if (!NO_NAME.equals(friendlyName)) {
267             return MoreObjects.toStringHelper(getClass())
268                 .omitNullValues()
269                 .add("friendlyName", friendlyName)
270                 // omit lengthy detail, when there's a name
271                 .toString();
272         }
273         return MoreObjects.toStringHelper(getClass()).add("registeredTypes", registeredTypes).toString();
274     }
275 }