f2cfbb4f199af38ca7d10eabcfe1557e51d5b3c5
[controller.git] / opendaylight / md-sal / sal-akka-segmented-journal / src / main / java / org / opendaylight / controller / akka / segjournal / DataJournalEntrySerializer.java
1 /*
2  * Copyright (c) 2019 Pantheon Technologies, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.akka.segjournal;
9
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorSystem;
14 import akka.actor.ExtendedActorSystem;
15 import akka.persistence.PersistentRepr;
16 import io.atomix.storage.journal.JournalSerdes.EntryInput;
17 import io.atomix.storage.journal.JournalSerdes.EntryOutput;
18 import io.atomix.storage.journal.JournalSerdes.EntrySerdes;
19 import java.io.IOException;
20 import java.util.concurrent.Callable;
21 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence;
22 import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence;
23
24 /**
25  * Kryo serializer for {@link DataJournalEntry}. Each {@link SegmentedJournalActor} has its own instance, as well as
26  * a nested JavaSerializer to handle the payload.
27  *
28  * <p>
29  * Since we are persisting only parts of {@link PersistentRepr}, this class asymmetric by design:
30  * {@link #write(EntryOutput, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper
31  * around a {@link PersistentRepr}, while {@link #read(EntryInput)} produces an {@link FromPersistence}, which
32  * needs further processing to reconstruct a {@link PersistentRepr}.
33  *
34  * @author Robert Varga
35  */
36 final class DataJournalEntrySerializer implements EntrySerdes<DataJournalEntry> {
37     private final ExtendedActorSystem actorSystem;
38
39     DataJournalEntrySerializer(final ActorSystem actorSystem) {
40         this.actorSystem = requireNonNull((ExtendedActorSystem) actorSystem);
41     }
42
43     @Override
44     public void write(final EntryOutput output, final DataJournalEntry entry) throws IOException {
45         verify(entry instanceof ToPersistence);
46         final PersistentRepr repr = ((ToPersistence) entry).repr();
47         output.writeString(repr.manifest());
48         output.writeString(repr.writerUuid());
49         output.writeObject(repr.payload());
50     }
51
52     @Override
53     public DataJournalEntry read(final EntryInput input) throws IOException {
54         final String manifest = input.readString();
55         final String uuid = input.readString();
56         final Object payload = akka.serialization.JavaSerializer.currentSystem().withValue(actorSystem,
57             (Callable<Object>) input::readObject);
58         return new FromPersistence(manifest, uuid, payload);
59     }
60 }