2 * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.akka.segjournal;
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorSystem;
14 import akka.actor.ExtendedActorSystem;
15 import akka.persistence.PersistentRepr;
16 import com.esotericsoftware.kryo.Kryo;
17 import com.esotericsoftware.kryo.Serializer;
18 import com.esotericsoftware.kryo.io.Input;
19 import com.esotericsoftware.kryo.io.Output;
20 import com.esotericsoftware.kryo.serializers.JavaSerializer;
21 import java.util.concurrent.Callable;
22 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
23 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
26 * Kryo serializer for {@link DataJournalEntry}. Each {@link SegmentedJournalActor} has its own instance, as well as
27 * a nested JavaSerializer to handle the payload.
30 * Since we are persisting only parts of {@link PersistentRepr}, this class asymmetric by design:
31 * {@link #write(Kryo, Output, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper
32 * around a {@link PersistentRepr}, while {@link #read(Kryo, Input, Class)} produces an {@link FromPersistence}, which
33 * needs further processing to reconstruct a {@link PersistentRepr}.
35 * @author Robert Varga
37 final class DataJournalEntrySerializer extends Serializer<DataJournalEntry> {
38 private final JavaSerializer serializer = new JavaSerializer();
39 private final ExtendedActorSystem actorSystem;
41 DataJournalEntrySerializer(final ActorSystem actorSystem) {
42 this.actorSystem = requireNonNull((ExtendedActorSystem) actorSystem);
46 public void write(final Kryo kryo, final Output output, final DataJournalEntry object) {
47 verify(object instanceof ToPersistence);
48 final PersistentRepr repr = ((ToPersistence) object).repr();
49 output.writeString(repr.manifest());
50 output.writeString(repr.writerUuid());
51 serializer.write(kryo, output, repr.payload());
55 public DataJournalEntry read(final Kryo kryo, final Input input, final Class<DataJournalEntry> type) {
56 final String manifest = input.readString();
57 final String uuid = input.readString();
58 final Object payload = akka.serialization.JavaSerializer.currentSystem().withValue(actorSystem,
59 (Callable<Object>)() -> serializer.read(kryo, input, type));
60 return new FromPersistence(manifest, uuid, payload);