X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Factors%2FShardSnapshotActor.java;h=d7d380830fd81b34502fe606ce3ca1e63da1113a;hb=3402cfce32b05957219e54754dd7ca5b0a54cd0e;hp=e7529fb4bfcf3b9f1003fce0c03ae26fd943c3a6;hpb=2faf656bf68dd3843fd59520b27a7ec2abbdcc68;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java index e7529fb4bf..d7d380830f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java @@ -7,18 +7,19 @@ */ package org.opendaylight.controller.cluster.datastore.actors; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.Props; -import com.google.common.base.Preconditions; import java.io.IOException; +import java.io.ObjectOutputStream; import java.io.OutputStream; import java.util.Optional; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is an offload actor, which is given an isolated snapshot of the data tree. It performs the potentially @@ -27,8 +28,6 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { - private static final Logger LOG = LoggerFactory.getLogger(ShardSnapshotActor.class); - // Internal message private static final class SerializeSnapshot { private final ShardDataTreeSnapshot snapshot; @@ -37,9 +36,9 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional installSnapshotStream, final ActorRef replyTo) { - this.snapshot = Preconditions.checkNotNull(snapshot); - this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream); - this.replyTo = Preconditions.checkNotNull(replyTo); + this.snapshot = requireNonNull(snapshot); + this.installSnapshotStream = requireNonNull(installSnapshotStream); + this.replyTo = requireNonNull(replyTo); } ShardDataTreeSnapshot getSnapshot() { @@ -58,12 +57,15 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { //actor name override used for metering. This does not change the "real" actor name private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot"; - private ShardSnapshotActor() { + private final InputOutputStreamFactory streamFactory; + + private ShardSnapshotActor(final InputOutputStreamFactory streamFactory) { super(ACTOR_NAME_FOR_METERING); + this.streamFactory = requireNonNull(streamFactory); } @Override - protected void handleReceive(final Object message) throws Exception { + protected void handleReceive(final Object message) { if (message instanceof SerializeSnapshot) { onSerializeSnapshot((SerializeSnapshot) message); } else { @@ -71,11 +73,11 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { } } - private void onSerializeSnapshot(SerializeSnapshot request) { + private void onSerializeSnapshot(final SerializeSnapshot request) { Optional installSnapshotStream = request.getInstallSnapshotStream(); if (installSnapshotStream.isPresent()) { - try { - request.getSnapshot().serialize(installSnapshotStream.get()); + try (ObjectOutputStream out = getOutputStream(installSnapshotStream.get())) { + request.getSnapshot().serialize(out); } catch (IOException e) { // TODO - we should communicate the failure in the CaptureSnapshotReply. LOG.error("Error serializing snapshot", e); @@ -86,6 +88,10 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { installSnapshotStream), ActorRef.noSender()); } + private ObjectOutputStream getOutputStream(final OutputStream outputStream) throws IOException { + return new ObjectOutputStream(streamFactory.wrapOutputStream(outputStream)); + } + /** * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply. * @@ -100,7 +106,7 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender()); } - public static Props props() { - return Props.create(ShardSnapshotActor.class); + public static Props props(final InputOutputStreamFactory streamFactory) { + return Props.create(ShardSnapshotActor.class, streamFactory); } }