X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft-example%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=663d400b53b3e3f7a589a08b819f930572d4c753;hb=refs%2Fchanges%2F72%2F50572%2F9;hp=ff57bfd1d5ae21db8f8ae1646c9a8f6e1f08b3a5;hpb=348a37f613ef444b10a0e65b400390396552fc48;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index ff57bfd1d5..663d400b53 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -11,15 +11,15 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import com.google.common.base.Optional; -import com.google.protobuf.ByteString; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import javax.annotation.Nonnull; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -32,14 +32,22 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.util.StringIdentifier; +import org.opendaylight.yangtools.util.AbstractStringIdentifier; /** * A sample actor showing how the RaftActor is to be extended */ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { + private static final class PayloadIdentifier extends AbstractStringIdentifier { + private static final long serialVersionUID = 1L; + + PayloadIdentifier(final long identifier) { + super(String.valueOf(identifier)); + } + } private final Map state = new HashMap<>(); @@ -63,7 +71,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, protected void handleNonRaftCommand(Object message) { if(message instanceof KeyValue){ if(isLeader()) { - persistData(getSender(), new StringIdentifier(String.valueOf(persistIdentifier++)), (Payload) message); + persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false); } else { if(getLeader() != null) { getLeader().forward(message, getContext()); @@ -124,21 +132,23 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void createSnapshot(ActorRef actorRef) { - ByteString bs = null; + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { try { - bs = fromObject(state); + if (installSnapshotStream.isPresent()) { + SerializationUtils.serialize((Serializable) state, installSnapshotStream.get()); + } } catch (Exception e) { LOG.error("Exception in creating snapshot", e); } - getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); + + getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null); } @Override - public void applySnapshot(byte [] snapshot) { + public void applySnapshot(Snapshot.State snapshotState) { state.clear(); try { - state.putAll((HashMap) toObject(snapshot)); + state.putAll(((MapState)snapshotState).state); } catch (Exception e) { LOG.error("Exception in applying snapshot", e); } @@ -147,45 +157,6 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } } - private static ByteString fromObject(Object snapshot) throws Exception { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(snapshot); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } - } - } - - private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException { - Object obj = null; - ByteArrayInputStream bis = null; - ObjectInputStream ois = null; - try { - bis = new ByteArrayInputStream(bs); - ois = new ObjectInputStream(bis); - obj = ois.readObject(); - } finally { - if (bis != null) { - bis.close(); - } - if (ois != null) { - ois.close(); - } - } - return obj; - } - @Override protected void onStateChanged() { } @@ -217,7 +188,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applyRecoverySnapshot(byte[] snapshot) { + public void applyRecoverySnapshot(Snapshot.State snapshotState) { } @Override @@ -229,4 +200,29 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, public byte[] getRestoreFromSnapshot() { return null; } + + @Override + public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) { + try { + return deserializePreCarbonSnapshot(snapshotBytes.read()); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @SuppressWarnings("unchecked") + @Override + public Snapshot.State deserializePreCarbonSnapshot(byte[] from) { + return new MapState((Map) SerializationUtils.deserialize(from)); + } + + private static class MapState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + Map state; + + MapState(Map state) { + this.state = state; + } + } }