X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Factors%2FShardSnapshotActor.java;h=4796e1c37f466cb4c2c94c281dbcc4ab0275b47c;hb=2634ed7138a343f051ff6452ccc7edd3abfc0c3a;hp=00bb424850027bd211b560edb025386da0894b00;hpb=b70b396725749d3fd6ca761f02f4b630f6f4f1ce;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java index 00bb424850..4796e1c37f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java @@ -10,8 +10,13 @@ package org.opendaylight.controller.cluster.datastore.actors; import akka.actor.ActorRef; import akka.actor.Props; import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Optional; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; /** @@ -24,10 +29,13 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { // Internal message private static final class SerializeSnapshot { private final ShardDataTreeSnapshot snapshot; + private final Optional installSnapshotStream; private final ActorRef replyTo; - SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final ActorRef replyTo) { + SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional installSnapshotStream, + final ActorRef replyTo) { this.snapshot = Preconditions.checkNotNull(snapshot); + this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream); this.replyTo = Preconditions.checkNotNull(replyTo); } @@ -35,6 +43,10 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { return snapshot; } + Optional getInstallSnapshotStream() { + return installSnapshotStream; + } + ActorRef getReplyTo() { return replyTo; } @@ -50,16 +62,39 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { @Override protected void handleReceive(final Object message) throws Exception { if (message instanceof SerializeSnapshot) { - final SerializeSnapshot request = (SerializeSnapshot) message; - request.getReplyTo().tell(new CaptureSnapshotReply(request.getSnapshot().serialize()), ActorRef.noSender()); + onSerializeSnapshot((SerializeSnapshot) message); } else { unknownMessage(message); } } - public static void requestSnapshot(final ActorRef snapshotActor, ShardDataTreeSnapshot snapshot, - final ActorRef replyTo) { - snapshotActor.tell(new SerializeSnapshot(snapshot, replyTo), ActorRef.noSender()); + private void onSerializeSnapshot(final SerializeSnapshot request) { + Optional installSnapshotStream = request.getInstallSnapshotStream(); + if (installSnapshotStream.isPresent()) { + try (ObjectOutputStream out = new ObjectOutputStream(installSnapshotStream.get())) { + request.getSnapshot().serialize(out); + } catch (IOException e) { + // TODO - we should communicate the failure in the CaptureSnapshotReply. + LOG.error("Error serializing snapshot", e); + } + } + + request.getReplyTo().tell(new CaptureSnapshotReply(new ShardSnapshotState(request.getSnapshot()), + installSnapshotStream), ActorRef.noSender()); + } + + /** + * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply. + * + * @param snapshotActor the ShardSnapshotActor + * @param snapshot the snapshot to process + * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed + * on a follower. + * @param replyTo the actor to which to send the CaptureSnapshotReply + */ + public static void requestSnapshot(final ActorRef snapshotActor, final ShardDataTreeSnapshot snapshot, + final Optional installSnapshotStream, final ActorRef replyTo) { + snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender()); } public static Props props() {