Bug 7521: Convert Snapshot to store a State instance
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / ShardSnapshotActor.java
index 00bb424850027bd211b560edb025386da0894b00..e7529fb4bfcf3b9f1003fce0c03ae26fd943c3a6 100644 (file)
@@ -10,9 +10,15 @@ package org.opendaylight.controller.cluster.datastore.actors;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Optional;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is an offload actor, which is given an isolated snapshot of the data tree. It performs the potentially
@@ -21,13 +27,18 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep
  * @author Robert Varga
  */
 public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardSnapshotActor.class);
+
     // Internal message
     private static final class SerializeSnapshot {
         private final ShardDataTreeSnapshot snapshot;
+        private final Optional<OutputStream> installSnapshotStream;
         private final ActorRef replyTo;
 
-        SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final ActorRef replyTo) {
+        SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional<OutputStream> installSnapshotStream,
+                final ActorRef replyTo) {
             this.snapshot = Preconditions.checkNotNull(snapshot);
+            this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream);
             this.replyTo = Preconditions.checkNotNull(replyTo);
         }
 
@@ -35,6 +46,10 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
             return snapshot;
         }
 
+        Optional<OutputStream> getInstallSnapshotStream() {
+            return installSnapshotStream;
+        }
+
         ActorRef getReplyTo() {
             return replyTo;
         }
@@ -50,16 +65,39 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
     @Override
     protected void handleReceive(final Object message) throws Exception {
         if (message instanceof SerializeSnapshot) {
-            final SerializeSnapshot request = (SerializeSnapshot) message;
-            request.getReplyTo().tell(new CaptureSnapshotReply(request.getSnapshot().serialize()), ActorRef.noSender());
+            onSerializeSnapshot((SerializeSnapshot) message);
         } else {
             unknownMessage(message);
         }
     }
 
-    public static void requestSnapshot(final ActorRef snapshotActor, ShardDataTreeSnapshot snapshot,
-            final ActorRef replyTo) {
-        snapshotActor.tell(new SerializeSnapshot(snapshot, replyTo), ActorRef.noSender());
+    private void onSerializeSnapshot(SerializeSnapshot request) {
+        Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
+        if (installSnapshotStream.isPresent()) {
+            try {
+                request.getSnapshot().serialize(installSnapshotStream.get());
+            } catch (IOException e) {
+                // TODO - we should communicate the failure in the CaptureSnapshotReply.
+                LOG.error("Error serializing snapshot", e);
+            }
+        }
+
+        request.getReplyTo().tell(new CaptureSnapshotReply(new ShardSnapshotState(request.getSnapshot()),
+                installSnapshotStream), ActorRef.noSender());
+    }
+
+    /**
+     * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
+     *
+     * @param snapshotActor the ShardSnapshotActor
+     * @param snapshot the snapshot to process
+     * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed
+     *        on a follower.
+     * @param replyTo the actor to which to send the CaptureSnapshotReply
+     */
+    public static void requestSnapshot(final ActorRef snapshotActor, final ShardDataTreeSnapshot snapshot,
+            final Optional<OutputStream> installSnapshotStream, final ActorRef replyTo) {
+        snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
     }
 
     public static Props props() {