import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.persistence.PersistentRepr;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import io.atomix.storage.journal.JournalSerdes.EntryInput;
+import io.atomix.storage.journal.JournalSerdes.EntryOutput;
+import io.atomix.storage.journal.JournalSerdes.EntrySerdes;
+import java.io.IOException;
import java.util.concurrent.Callable;
import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
*
* <p>
* Since we are persisting only parts of {@link PersistentRepr}, this class asymmetric by design:
- * {@link #write(Kryo, Output, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper
- * around a {@link PersistentRepr}, while {@link #read(Kryo, Input, Class)} produces an {@link FromPersistence}, which
+ * {@link #write(EntryOutput, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper
+ * around a {@link PersistentRepr}, while {@link #read(EntryInput)} produces an {@link FromPersistence}, which
* needs further processing to reconstruct a {@link PersistentRepr}.
*
* @author Robert Varga
*/
-final class DataJournalEntrySerializer extends Serializer<DataJournalEntry> {
- private final JavaSerializer serializer = new JavaSerializer();
+final class DataJournalEntrySerializer implements EntrySerdes<DataJournalEntry> {
private final ExtendedActorSystem actorSystem;
DataJournalEntrySerializer(final ActorSystem actorSystem) {
}
@Override
- public void write(final Kryo kryo, final Output output, final DataJournalEntry object) {
- verify(object instanceof ToPersistence);
- final PersistentRepr repr = ((ToPersistence) object).repr();
+ public void write(final EntryOutput output, final DataJournalEntry entry) throws IOException {
+ verify(entry instanceof ToPersistence);
+ final PersistentRepr repr = ((ToPersistence) entry).repr();
output.writeString(repr.manifest());
output.writeString(repr.writerUuid());
- serializer.write(kryo, output, repr.payload());
+ output.writeObject(repr.payload());
}
@Override
- public DataJournalEntry read(final Kryo kryo, final Input input, final Class<DataJournalEntry> type) {
+ public DataJournalEntry read(final EntryInput input) throws IOException {
final String manifest = input.readString();
final String uuid = input.readString();
final Object payload = akka.serialization.JavaSerializer.currentSystem().withValue(actorSystem,
- (Callable<Object>)() -> serializer.read(kryo, input, type));
+ (Callable<Object>) input::readObject);
return new FromPersistence(manifest, uuid, payload);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.akka.segjournal;
+
+import io.atomix.storage.journal.JournalSerdes.EntryInput;
+import io.atomix.storage.journal.JournalSerdes.EntryOutput;
+import io.atomix.storage.journal.JournalSerdes.EntrySerdes;
+import java.io.IOException;
+
+final class LongSerdes implements EntrySerdes<Long> {
+ @Override
+ public Long read(final EntryInput input) throws IOException {
+ return input.readLong();
+ }
+
+ @Override
+ public void write(final EntryOutput output, final Long entry) throws IOException {
+ output.writeLong(entry);
+ }
+}
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
-import com.esotericsoftware.kryo.serializers.DefaultSerializers.LongSerializer;
import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import io.atomix.storage.journal.Indexed;
private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder()
- .register(new LongSerializer(), Long.class)
+ .register(new LongSerdes(), Long.class)
.build();
private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
<configuration>
<instructions>
<Export-Package>
- io.atomix.storage.journal,
- com.esotericsoftware.kryo.*;version=4.0.2
+ io.atomix.storage.journal
</Export-Package>
<Import-Package>
sun.nio.ch;resolution:=optional,
*/
package io.atomix.storage.journal;
-import com.esotericsoftware.kryo.Serializer;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
import io.atomix.utils.serializer.Namespace;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
* When multiple classes are registered with an explicitly provided serializer, the namespace guarantees
* all instances will be serialized with the same type ID.
*
- * @param classes list of classes to register
- * @param serializer serializer to use for the class
+ * @param classes list of classes to register
+ * @param serdes serializer to use for the class
* @return this builder
*/
- Builder register(Serializer<?> serializer, Class<?>... classes);
+ Builder register(EntrySerdes<?> serdes, Class<?>... classes);
/**
* Sets the namespace class loader.
*/
Builder setClassLoader(ClassLoader classLoader);
}
+
+ /**
+ * Input data stream exposed to {@link EntrySerdes#read(EntryInput)}.
+ */
+ @Beta
+ interface EntryInput {
+
+ byte[] readBytes(int length) throws IOException;
+
+ long readLong() throws IOException;
+
+ String readString() throws IOException;
+
+ Object readObject() throws IOException;
+
+ @VisibleForTesting
+ int readVarInt() throws IOException;
+ }
+
+ /**
+ * Output data stream exposed to {@link EntrySerdes#write(EntryOutput, Object)}.
+ */
+ @Beta
+ interface EntryOutput {
+
+ void writeBytes(byte[] bytes) throws IOException;
+
+ void writeLong(long value) throws IOException;
+
+ void writeObject(Object value) throws IOException;
+
+ void writeString(String value) throws IOException;
+
+ @VisibleForTesting
+ void writeVarInt(int value) throws IOException;
+ }
+
+ /**
+ * A serializer/deserializer for an entry.
+ *
+ * @param <T> Entry type
+ */
+ interface EntrySerdes<T> {
+
+ T read(EntryInput input) throws IOException;
+
+ void write(EntryOutput output, T entry) throws IOException;
+ }
}
--- /dev/null
+/*
+ * Copyright 2023 PANTHEON.tech, s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.utils.serializer;
+
+import static java.util.Objects.requireNonNull;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import io.atomix.storage.journal.JournalSerdes.EntrySerdes;
+import java.io.IOException;
+
+final class EntrySerializer<T> extends Serializer<T> {
+ // Note: uses identity to create things in Kryo, hence we want an instance for every serdes we wrap
+ private final JavaSerializer javaSerializer = new JavaSerializer();
+ private final EntrySerdes<T> serdes;
+
+ EntrySerializer(final EntrySerdes<T> serdes) {
+ this.serdes = requireNonNull(serdes);
+ }
+
+ @Override
+ public T read(final Kryo kryo, final Input input, final Class<T> type) {
+ try {
+ return serdes.read(new KryoEntryInput(kryo, input, javaSerializer));
+ } catch (IOException e) {
+ throw new KryoException(e);
+ }
+ }
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final T object) {
+ try {
+ serdes.write(new KryoEntryOutput(kryo, output, javaSerializer), object);
+ } catch (IOException e) {
+ throw new KryoException(e);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright 2023 PANTHEON.tech, s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.utils.serializer;
+
+import static java.util.Objects.requireNonNull;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import io.atomix.storage.journal.JournalSerdes.EntryInput;
+import java.io.IOException;
+
+final class KryoEntryInput implements EntryInput {
+ private final Kryo kryo;
+ private final Input input;
+ private final JavaSerializer javaSerializer;
+
+ KryoEntryInput(final Kryo kryo, final Input input, final JavaSerializer javaSerializer) {
+ this.kryo = requireNonNull(kryo);
+ this.input = requireNonNull(input);
+ this.javaSerializer = requireNonNull(javaSerializer);
+ }
+
+ @Override
+ public byte[] readBytes(final int length) throws IOException {
+ try {
+ return input.readBytes(length);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ try {
+ return input.readLong(false);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Object readObject() throws IOException {
+ try {
+ return javaSerializer.read(kryo, input, null);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public String readString() throws IOException {
+ try {
+ return input.readString();
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int readVarInt() throws IOException {
+ try {
+ return input.readVarInt(true);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright 2023 PANTHEON.tech, s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.utils.serializer;
+
+import static java.util.Objects.requireNonNull;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import io.atomix.storage.journal.JournalSerdes.EntryOutput;
+import java.io.IOException;
+
+final class KryoEntryOutput implements EntryOutput {
+ private final Kryo kryo;
+ private final Output output;
+ private final JavaSerializer javaSerializer;
+
+ KryoEntryOutput(final Kryo kryo, final Output output, final JavaSerializer javaSerializer) {
+ this.kryo = requireNonNull(kryo);
+ this.output = requireNonNull(output);
+ this.javaSerializer = requireNonNull(javaSerializer);
+ }
+
+ @Override
+ public void writeBytes(final byte[] bytes) throws IOException {
+ try {
+ output.writeBytes(bytes);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeLong(final long value) throws IOException {
+ try {
+ output.writeLong(value, false);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeObject(final Object value) throws IOException {
+ try {
+ javaSerializer.write(kryo, output, value);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeString(final String value) throws IOException {
+ try {
+ output.writeString(value);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void writeVarInt(final int value) throws IOException {
+ try {
+ output.writeVarInt(value, true);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ }
+ }
+}
*/
private static final class Builder implements JournalSerdes.Builder {
private final int blockHeadId = INITIAL_ID;
- private final List<Entry<Class<?>[], Serializer<?>>> types = new ArrayList<>();
+ private final List<Entry<Class<?>[], EntrySerializer<?>>> types = new ArrayList<>();
private final List<RegistrationBlock> blocks = new ArrayList<>();
private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
@Override
- public Builder register(final Serializer<?> serializer, final Class<?>... classes) {
- types.add(Map.entry(classes, serializer));
+ public Builder register(final EntrySerdes<?> serdes, final Class<?>... classes) {
+ types.add(Map.entry(classes, new EntrySerializer<>(serdes)));
return this;
}
if (id == FLOATING_ID) {
id = kryo.getNextRegistrationId();
}
- for (Entry<Class<?>[], Serializer<?>> entry : block.types()) {
+ for (Entry<Class<?>[], EntrySerializer<?>> entry : block.types()) {
register(kryo, entry.getKey(), entry.getValue(), id++);
}
}
static final class RegistrationBlock {
private final int begin;
- private final ImmutableList<Entry<Class<?>[], Serializer<?>>> types;
+ private final ImmutableList<Entry<Class<?>[], EntrySerializer<?>>> types;
- RegistrationBlock(final int begin, final List<Entry<Class<?>[], Serializer<?>>> types) {
+ RegistrationBlock(final int begin, final List<Entry<Class<?>[], EntrySerializer<?>>> types) {
this.begin = begin;
this.types = ImmutableList.copyOf(types);
}
return begin;
}
- public ImmutableList<Entry<Class<?>[], Serializer<?>>> types() {
+ public ImmutableList<Entry<Class<?>[], EntrySerializer<?>>> types() {
return types;
}
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import com.esotericsoftware.kryo.serializers.DefaultArraySerializers.ByteArraySerializer;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
@RunWith(Parameterized.class)
public abstract class AbstractJournalTest {
private static final JournalSerdes NAMESPACE = JournalSerdes.builder()
- .register(new TestEntrySerializer(), TestEntry.class)
- .register(new ByteArraySerializer(), byte[].class)
+ .register(new TestEntrySerdes(), TestEntry.class)
+ .register(new ByteArraySerdes(), byte[].class)
.build();
protected static final TestEntry ENTRY = new TestEntry(32);
--- /dev/null
+/*
+ * Copyright 2023 PANTHEON.tech, s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import io.atomix.storage.journal.JournalSerdes.EntryInput;
+import io.atomix.storage.journal.JournalSerdes.EntryOutput;
+import io.atomix.storage.journal.JournalSerdes.EntrySerdes;
+import java.io.IOException;
+
+final class ByteArraySerdes implements EntrySerdes<byte[]> {
+ @Override
+ public byte[] read(final EntryInput input) throws IOException {
+ int length = input.readVarInt();
+ return length == 0 ? null : input.readBytes(length - 1);
+ }
+
+ @Override
+ public void write(final EntryOutput output, final byte[] entry) throws IOException {
+ if (entry != null) {
+ output.writeVarInt(entry.length + 1);
+ output.writeBytes(entry);
+ } else {
+ output.writeVarInt(0);
+ }
+ }
+}
*/
package io.atomix.storage.journal;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.DefaultArraySerializers.ByteArraySerializer;
+import io.atomix.storage.journal.JournalSerdes.EntryInput;
+import io.atomix.storage.journal.JournalSerdes.EntryOutput;
+import io.atomix.storage.journal.JournalSerdes.EntrySerdes;
+import java.io.IOException;
-class TestEntrySerializer extends Serializer<TestEntry> {
- private static final ByteArraySerializer BA_SERIALIZER = new ByteArraySerializer();
+final class TestEntrySerdes implements EntrySerdes<TestEntry> {
+ private static final ByteArraySerdes BA_SERIALIZER = new ByteArraySerdes();
@Override
- public void write(Kryo kryo, Output output, TestEntry object) {
- kryo.writeObjectOrNull(output, object.bytes(), BA_SERIALIZER);
+ public TestEntry read(final EntryInput input) throws IOException {
+ return new TestEntry(BA_SERIALIZER.read(input));
}
@Override
- public TestEntry read(Kryo kryo, Input input, Class<TestEntry> type) {
- return new TestEntry(kryo.readObjectOrNull(input, byte[].class, BA_SERIALIZER));
+ public void write(final EntryOutput output, final TestEntry entry) throws IOException {
+ BA_SERIALIZER.write(output, entry.bytes());
}
}