Add optional lz4 compression for snapshots
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / ShardSnapshotActor.java
index e7529fb4bfcf3b9f1003fce0c03ae26fd943c3a6..d7d380830fd81b34502fe606ce3ca1e63da1113a 100644 (file)
@@ -7,18 +7,19 @@
  */
 package org.opendaylight.controller.cluster.datastore.actors;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import com.google.common.base.Preconditions;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.util.Optional;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This is an offload actor, which is given an isolated snapshot of the data tree. It performs the potentially
@@ -27,8 +28,6 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
-    private static final Logger LOG = LoggerFactory.getLogger(ShardSnapshotActor.class);
-
     // Internal message
     private static final class SerializeSnapshot {
         private final ShardDataTreeSnapshot snapshot;
@@ -37,9 +36,9 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
 
         SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional<OutputStream> installSnapshotStream,
                 final ActorRef replyTo) {
-            this.snapshot = Preconditions.checkNotNull(snapshot);
-            this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream);
-            this.replyTo = Preconditions.checkNotNull(replyTo);
+            this.snapshot = requireNonNull(snapshot);
+            this.installSnapshotStream = requireNonNull(installSnapshotStream);
+            this.replyTo = requireNonNull(replyTo);
         }
 
         ShardDataTreeSnapshot getSnapshot() {
@@ -58,12 +57,15 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
     //actor name override used for metering. This does not change the "real" actor name
     private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot";
 
-    private ShardSnapshotActor() {
+    private final InputOutputStreamFactory streamFactory;
+
+    private ShardSnapshotActor(final InputOutputStreamFactory streamFactory) {
         super(ACTOR_NAME_FOR_METERING);
+        this.streamFactory = requireNonNull(streamFactory);
     }
 
     @Override
-    protected void handleReceive(final Object message) throws Exception {
+    protected void handleReceive(final Object message) {
         if (message instanceof SerializeSnapshot) {
             onSerializeSnapshot((SerializeSnapshot) message);
         } else {
@@ -71,11 +73,11 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
         }
     }
 
-    private void onSerializeSnapshot(SerializeSnapshot request) {
+    private void onSerializeSnapshot(final SerializeSnapshot request) {
         Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
         if (installSnapshotStream.isPresent()) {
-            try {
-                request.getSnapshot().serialize(installSnapshotStream.get());
+            try (ObjectOutputStream out = getOutputStream(installSnapshotStream.get())) {
+                request.getSnapshot().serialize(out);
             } catch (IOException e) {
                 // TODO - we should communicate the failure in the CaptureSnapshotReply.
                 LOG.error("Error serializing snapshot", e);
@@ -86,6 +88,10 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
                 installSnapshotStream), ActorRef.noSender());
     }
 
+    private ObjectOutputStream getOutputStream(final OutputStream outputStream) throws IOException {
+        return new ObjectOutputStream(streamFactory.wrapOutputStream(outputStream));
+    }
+
     /**
      * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
      *
@@ -100,7 +106,7 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
         snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
     }
 
-    public static Props props() {
-        return Props.create(ShardSnapshotActor.class);
+    public static Props props(final InputOutputStreamFactory streamFactory) {
+        return Props.create(ShardSnapshotActor.class, streamFactory);
     }
 }