Modernize tests and fixup checkstyle
[controller.git] / atomix-storage / src / main / java / io / atomix / utils / serializer / KryoJournalSerdes.java
1 /*
2  * Copyright 2014-2021 Open Networking Foundation
3  * Copyright 2023 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.utils.serializer;
18
19 import static java.util.Objects.requireNonNull;
20
21 import com.esotericsoftware.kryo.Kryo;
22 import com.esotericsoftware.kryo.Registration;
23 import com.esotericsoftware.kryo.Serializer;
24 import com.esotericsoftware.kryo.io.ByteBufferInput;
25 import com.esotericsoftware.kryo.io.ByteBufferOutput;
26 import com.esotericsoftware.kryo.pool.KryoCallback;
27 import com.esotericsoftware.kryo.pool.KryoFactory;
28 import com.esotericsoftware.kryo.pool.KryoPool;
29 import com.google.common.base.MoreObjects;
30 import io.atomix.storage.journal.JournalSerdes;
31 import java.io.ByteArrayInputStream;
32 import java.io.InputStream;
33 import java.io.OutputStream;
34 import java.nio.ByteBuffer;
35 import java.util.Arrays;
36 import java.util.List;
37 import org.objenesis.strategy.StdInstantiatorStrategy;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Pool of Kryo instances, with classes pre-registered.
43  */
44 @Deprecated(forRemoval = true, since = "9.0.3")
45 final class KryoJournalSerdes implements JournalSerdes, KryoFactory, KryoPool {
46     /**
47      * Default buffer size used for serialization.
48      *
49      * @see #serialize(Object)
50      */
51     private static final int DEFAULT_BUFFER_SIZE = 4096;
52
53     /**
54      * Smallest ID free to use for user defined registrations.
55      */
56     private static final int INITIAL_ID = 16;
57
58     static final String NO_NAME = "(no name)";
59
60     private static final Logger LOGGER = LoggerFactory.getLogger(KryoJournalSerdes.class);
61
62     private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
63
64     private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
65     private final KryoInputPool kryoInputPool = new KryoInputPool();
66
67     private final List<RegisteredType> registeredTypes;
68     private final ClassLoader classLoader;
69     private final String friendlyName;
70
71     /**
72      * Creates a Kryo instance pool.
73      *
74      * @param registeredTypes      types to register
75      * @param registrationRequired whether registration is required
76      * @param friendlyName         friendly name for the namespace
77      */
78     KryoJournalSerdes(
79             final List<RegisteredType> registeredTypes,
80             final ClassLoader classLoader,
81             final String friendlyName) {
82         this.registeredTypes = List.copyOf(registeredTypes);
83         this.classLoader = requireNonNull(classLoader);
84         this.friendlyName = requireNonNull(friendlyName);
85
86         // Pre-populate with a single instance
87         release(create());
88     }
89
90     @Override
91     public byte[] serialize(final Object obj) {
92         return serialize(obj, DEFAULT_BUFFER_SIZE);
93     }
94
95     @Override
96     public byte[] serialize(final Object obj, final int bufferSize) {
97         return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
98             kryo.writeClassAndObject(output, obj);
99             output.flush();
100             return output.getByteArrayOutputStream().toByteArray();
101         }), bufferSize);
102     }
103
104     @Override
105     public void serialize(final Object obj, final ByteBuffer buffer) {
106         ByteBufferOutput out = new ByteBufferOutput(buffer);
107         Kryo kryo = borrow();
108         try {
109             kryo.writeClassAndObject(out, obj);
110             out.flush();
111         } finally {
112             release(kryo);
113         }
114     }
115
116     @Override
117     public void serialize(final Object obj, final OutputStream stream) {
118         serialize(obj, stream, DEFAULT_BUFFER_SIZE);
119     }
120
121     @Override
122     public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
123         ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
124         Kryo kryo = borrow();
125         try {
126             kryo.writeClassAndObject(out, obj);
127             out.flush();
128         } finally {
129             release(kryo);
130         }
131     }
132
133     @Override
134     public <T> T deserialize(final byte[] bytes) {
135         return kryoInputPool.run(input -> {
136             input.setInputStream(new ByteArrayInputStream(bytes));
137             return kryoPool.run(kryo -> {
138                 @SuppressWarnings("unchecked")
139                 T obj = (T) kryo.readClassAndObject(input);
140                 return obj;
141             });
142         }, DEFAULT_BUFFER_SIZE);
143     }
144
145     @Override
146     public <T> T deserialize(final ByteBuffer buffer) {
147         Kryo kryo = borrow();
148         try {
149             @SuppressWarnings("unchecked")
150             T obj = (T) kryo.readClassAndObject(new Kryo505ByteBufferInput(buffer));
151             return obj;
152         } finally {
153             release(kryo);
154         }
155     }
156
157     @Override
158     public <T> T deserialize(final InputStream stream) {
159         return deserialize(stream, DEFAULT_BUFFER_SIZE);
160     }
161
162     @Override
163     public <T> T deserialize(final InputStream stream, final int bufferSize) {
164         Kryo kryo = borrow();
165         try {
166             @SuppressWarnings("unchecked")
167             T obj = (T) kryo.readClassAndObject(new ByteBufferInput(stream, bufferSize));
168             return obj;
169         } finally {
170             release(kryo);
171         }
172     }
173
174     /**
175      * Creates a Kryo instance.
176      *
177      * @return Kryo instance
178      */
179     @Override
180     public Kryo create() {
181         LOGGER.trace("Creating Kryo instance for {}", this);
182         Kryo kryo = new Kryo();
183         kryo.setClassLoader(classLoader);
184         kryo.setRegistrationRequired(true);
185
186         // TODO rethink whether we want to use StdInstantiatorStrategy
187         kryo.setInstantiatorStrategy(
188             new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
189
190         int id = INITIAL_ID;
191         for (RegisteredType registeredType : registeredTypes) {
192             register(kryo, registeredType.types(), registeredType.serializer(), id++);
193         }
194         return kryo;
195     }
196
197     /**
198      * Register {@code type} and {@code serializer} to {@code kryo} instance.
199      *
200      * @param kryo       Kryo instance
201      * @param types      types to register
202      * @param serializer Specific serializer to register or null to use default.
203      * @param id         type registration id to use
204      */
205     private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
206         Registration existing = kryo.getRegistration(id);
207         if (existing != null) {
208             boolean matches = false;
209             for (Class<?> type : types) {
210                 if (existing.getType() == type) {
211                     matches = true;
212                     break;
213                 }
214             }
215
216             if (!matches) {
217                 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
218                     friendlyName, types, id, existing.getType());
219
220                 throw new IllegalStateException(String.format(
221                     "Failed to register %s as %s, %s was already registered.",
222                     Arrays.toString(types), id, existing.getType()));
223             }
224             // falling through to register call for now.
225             // Consider skipping, if there's reasonable
226             // way to compare serializer equivalence.
227         }
228
229         for (Class<?> type : types) {
230             Registration reg = null;
231             if (serializer == null) {
232                 reg = kryo.register(type, id);
233             } else if (type.isInterface()) {
234                 kryo.addDefaultSerializer(type, serializer);
235             } else {
236                 reg = kryo.register(type, serializer, id);
237             }
238             if (reg != null) {
239                 if (reg.getId() != id) {
240                     LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
241                         friendlyName, reg.getType(), reg.getId(), id);
242                 }
243                 LOGGER.trace("{} registered as {}", reg.getType(), reg.getId());
244             }
245         }
246     }
247
248     @Override
249     public Kryo borrow() {
250         return kryoPool.borrow();
251     }
252
253     @Override
254     public void release(final Kryo kryo) {
255         kryoPool.release(kryo);
256     }
257
258     @Override
259     public <T> T run(final KryoCallback<T> callback) {
260         return kryoPool.run(callback);
261     }
262
263     @Override
264     public String toString() {
265         if (!NO_NAME.equals(friendlyName)) {
266             return MoreObjects.toStringHelper(getClass())
267                 .omitNullValues()
268                 .add("friendlyName", friendlyName)
269                 // omit lengthy detail, when there's a name
270                 .toString();
271         }
272         return MoreObjects.toStringHelper(getClass()).add("registeredTypes", registeredTypes).toString();
273     }
274 }