X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Factors%2FShardSnapshotActor.java;h=d7d380830fd81b34502fe606ce3ca1e63da1113a;hb=3402cfce32b05957219e54754dd7ca5b0a54cd0e;hp=00bb424850027bd211b560edb025386da0894b00;hpb=b70b396725749d3fd6ca761f02f4b630f6f4f1ce;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java index 00bb424850..d7d380830f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java @@ -7,11 +7,18 @@ */ package org.opendaylight.controller.cluster.datastore.actors; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.Props; -import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.Optional; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; /** @@ -24,17 +31,24 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { // Internal message private static final class SerializeSnapshot { private final ShardDataTreeSnapshot snapshot; + private final Optional installSnapshotStream; private final ActorRef replyTo; - SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final ActorRef replyTo) { - this.snapshot = Preconditions.checkNotNull(snapshot); - this.replyTo = Preconditions.checkNotNull(replyTo); + SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional installSnapshotStream, + final ActorRef replyTo) { + this.snapshot = requireNonNull(snapshot); + this.installSnapshotStream = requireNonNull(installSnapshotStream); + this.replyTo = requireNonNull(replyTo); } ShardDataTreeSnapshot getSnapshot() { return snapshot; } + Optional getInstallSnapshotStream() { + return installSnapshotStream; + } + ActorRef getReplyTo() { return replyTo; } @@ -43,26 +57,56 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { //actor name override used for metering. This does not change the "real" actor name private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot"; - private ShardSnapshotActor() { + private final InputOutputStreamFactory streamFactory; + + private ShardSnapshotActor(final InputOutputStreamFactory streamFactory) { super(ACTOR_NAME_FOR_METERING); + this.streamFactory = requireNonNull(streamFactory); } @Override - protected void handleReceive(final Object message) throws Exception { + protected void handleReceive(final Object message) { if (message instanceof SerializeSnapshot) { - final SerializeSnapshot request = (SerializeSnapshot) message; - request.getReplyTo().tell(new CaptureSnapshotReply(request.getSnapshot().serialize()), ActorRef.noSender()); + onSerializeSnapshot((SerializeSnapshot) message); } else { unknownMessage(message); } } - public static void requestSnapshot(final ActorRef snapshotActor, ShardDataTreeSnapshot snapshot, - final ActorRef replyTo) { - snapshotActor.tell(new SerializeSnapshot(snapshot, replyTo), ActorRef.noSender()); + private void onSerializeSnapshot(final SerializeSnapshot request) { + Optional installSnapshotStream = request.getInstallSnapshotStream(); + if (installSnapshotStream.isPresent()) { + try (ObjectOutputStream out = getOutputStream(installSnapshotStream.get())) { + request.getSnapshot().serialize(out); + } catch (IOException e) { + // TODO - we should communicate the failure in the CaptureSnapshotReply. + LOG.error("Error serializing snapshot", e); + } + } + + request.getReplyTo().tell(new CaptureSnapshotReply(new ShardSnapshotState(request.getSnapshot()), + installSnapshotStream), ActorRef.noSender()); + } + + private ObjectOutputStream getOutputStream(final OutputStream outputStream) throws IOException { + return new ObjectOutputStream(streamFactory.wrapOutputStream(outputStream)); + } + + /** + * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply. + * + * @param snapshotActor the ShardSnapshotActor + * @param snapshot the snapshot to process + * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed + * on a follower. + * @param replyTo the actor to which to send the CaptureSnapshotReply + */ + public static void requestSnapshot(final ActorRef snapshotActor, final ShardDataTreeSnapshot snapshot, + final Optional installSnapshotStream, final ActorRef replyTo) { + snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender()); } - public static Props props() { - return Props.create(ShardSnapshotActor.class); + public static Props props(final InputOutputStreamFactory streamFactory) { + return Props.create(ShardSnapshotActor.class, streamFactory); } }