2 * Copyright 2014-2021 Open Networking Foundation
3 * Copyright 2023 PANTHEON.tech, s.r.o.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.utils.serializer;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
22 import com.esotericsoftware.kryo.Kryo;
23 import com.esotericsoftware.kryo.Registration;
24 import com.esotericsoftware.kryo.Serializer;
25 import com.esotericsoftware.kryo.io.ByteBufferInput;
26 import com.esotericsoftware.kryo.io.ByteBufferOutput;
27 import com.esotericsoftware.kryo.pool.KryoCallback;
28 import com.esotericsoftware.kryo.pool.KryoFactory;
29 import com.esotericsoftware.kryo.pool.KryoPool;
30 import com.google.common.base.MoreObjects;
31 import io.atomix.storage.journal.JournalSerdes;
32 import java.io.ByteArrayInputStream;
33 import java.io.InputStream;
34 import java.io.OutputStream;
35 import java.nio.ByteBuffer;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import org.objenesis.strategy.StdInstantiatorStrategy;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * Pool of Kryo instances, with classes pre-registered.
46 public final class Namespace implements JournalSerdes, KryoFactory, KryoPool {
48 * Default buffer size used for serialization.
50 * @see #serialize(Object)
52 private static final int DEFAULT_BUFFER_SIZE = 4096;
55 * Smallest ID free to use for user defined registrations.
57 private static final int INITIAL_ID = 16;
59 private static final String NO_NAME = "(no name)";
61 private static final Logger LOGGER = LoggerFactory.getLogger(Namespace.class);
63 private final KryoPool kryoPool = new KryoPool.Builder(this).softReferences().build();
65 private final KryoOutputPool kryoOutputPool = new KryoOutputPool();
66 private final KryoInputPool kryoInputPool = new KryoInputPool();
68 private final List<RegisteredType> registeredTypes;
69 private final ClassLoader classLoader;
70 private final String friendlyName;
73 * Creates a Kryo instance pool.
75 * @param registeredTypes types to register
76 * @param registrationRequired whether registration is required
77 * @param friendlyName friendly name for the namespace
80 final List<RegisteredType> registeredTypes,
81 final ClassLoader classLoader,
82 final String friendlyName) {
83 this.registeredTypes = List.copyOf(registeredTypes);
84 this.classLoader = requireNonNull(classLoader);
85 this.friendlyName = requireNonNull(friendlyName);
87 // Pre-populate with a single instance
92 * Creates a new {@link Namespace} builder.
96 public static JournalSerdes.Builder builder() {
101 public byte[] serialize(final Object obj) {
102 return serialize(obj, DEFAULT_BUFFER_SIZE);
106 public byte[] serialize(final Object obj, final int bufferSize) {
107 return kryoOutputPool.run(output -> kryoPool.run(kryo -> {
108 kryo.writeClassAndObject(output, obj);
110 return output.getByteArrayOutputStream().toByteArray();
115 public void serialize(final Object obj, final ByteBuffer buffer) {
116 ByteBufferOutput out = new ByteBufferOutput(buffer);
117 Kryo kryo = borrow();
119 kryo.writeClassAndObject(out, obj);
127 public void serialize(final Object obj, final OutputStream stream) {
128 serialize(obj, stream, DEFAULT_BUFFER_SIZE);
132 public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
133 ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
134 Kryo kryo = borrow();
136 kryo.writeClassAndObject(out, obj);
144 public <T> T deserialize(final byte[] bytes) {
145 return kryoInputPool.run(input -> {
146 input.setInputStream(new ByteArrayInputStream(bytes));
147 return kryoPool.run(kryo -> {
148 @SuppressWarnings("unchecked")
149 T obj = (T) kryo.readClassAndObject(input);
152 }, DEFAULT_BUFFER_SIZE);
156 public <T> T deserialize(final ByteBuffer buffer) {
157 ByteBufferInput in = new ByteBufferInput(buffer);
158 Kryo kryo = borrow();
160 @SuppressWarnings("unchecked")
161 T obj = (T) kryo.readClassAndObject(in);
169 public <T> T deserialize(final InputStream stream) {
170 return deserialize(stream, DEFAULT_BUFFER_SIZE);
174 public <T> T deserialize(final InputStream stream, final int bufferSize) {
175 ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
176 Kryo kryo = borrow();
178 @SuppressWarnings("unchecked")
179 T obj = (T) kryo.readClassAndObject(in);
187 * Creates a Kryo instance.
189 * @return Kryo instance
192 public Kryo create() {
193 LOGGER.trace("Creating Kryo instance for {}", this);
194 Kryo kryo = new Kryo();
195 kryo.setClassLoader(classLoader);
196 kryo.setRegistrationRequired(true);
198 // TODO rethink whether we want to use StdInstantiatorStrategy
199 kryo.setInstantiatorStrategy(
200 new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
203 for (RegisteredType registeredType : registeredTypes) {
204 register(kryo, registeredType.types(), registeredType.serializer(), id++);
210 * Register {@code type} and {@code serializer} to {@code kryo} instance.
212 * @param kryo Kryo instance
213 * @param types types to register
214 * @param serializer Specific serializer to register or null to use default.
215 * @param id type registration id to use
217 private void register(final Kryo kryo, final Class<?>[] types, final Serializer<?> serializer, final int id) {
218 Registration existing = kryo.getRegistration(id);
219 if (existing != null) {
220 boolean matches = false;
221 for (Class<?> type : types) {
222 if (existing.getType() == type) {
229 LOGGER.error("{}: Failed to register {} as {}, {} was already registered.",
230 friendlyName, types, id, existing.getType());
232 throw new IllegalStateException(String.format(
233 "Failed to register %s as %s, %s was already registered.",
234 Arrays.toString(types), id, existing.getType()));
236 // falling through to register call for now.
237 // Consider skipping, if there's reasonable
238 // way to compare serializer equivalence.
241 for (Class<?> type : types) {
242 Registration r = null;
243 if (serializer == null) {
244 r = kryo.register(type, id);
245 } else if (type.isInterface()) {
246 kryo.addDefaultSerializer(type, serializer);
248 r = kryo.register(type, serializer, id);
251 if (r.getId() != id) {
252 LOGGER.debug("{}: {} already registered as {}. Skipping {}.",
253 friendlyName, r.getType(), r.getId(), id);
255 LOGGER.trace("{} registered as {}", r.getType(), r.getId());
261 public Kryo borrow() {
262 return kryoPool.borrow();
266 public void release(final Kryo kryo) {
267 kryoPool.release(kryo);
271 public <T> T run(final KryoCallback<T> callback) {
272 return kryoPool.run(callback);
276 public String toString() {
277 if (!NO_NAME.equals(friendlyName)) {
278 return MoreObjects.toStringHelper(getClass())
280 .add("friendlyName", friendlyName)
281 // omit lengthy detail, when there's a name
284 return MoreObjects.toStringHelper(getClass()).add("registeredTypes", registeredTypes).toString();
287 private static record RegisteredType(EntrySerializer<?> serializer, Class<?>[] types) {
289 requireNonNull(serializer);
290 requireNonNull(types);
294 private static final class Builder implements JournalSerdes.Builder {
295 private final List<RegisteredType> types = new ArrayList<>();
296 private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
299 public Builder register(final EntrySerdes<?> serdes, final Class<?>... classes) {
300 types.add(new RegisteredType(new EntrySerializer<>(serdes), classes));
305 public Builder setClassLoader(final ClassLoader classLoader) {
306 this.classLoader = requireNonNull(classLoader);
311 public JournalSerdes build() {
312 return build(NO_NAME);
316 public JournalSerdes build(final String friendlyName) {
317 checkState(!types.isEmpty(), "No serializers registered");
318 return new Namespace(types, classLoader, friendlyName);