import akka.actor.ActorRef;
import akka.actor.Props;
import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Optional;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
/**
// Internal message
private static final class SerializeSnapshot {
private final ShardDataTreeSnapshot snapshot;
+ private final Optional<OutputStream> installSnapshotStream;
private final ActorRef replyTo;
- SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final ActorRef replyTo) {
+ SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional<OutputStream> installSnapshotStream,
+ final ActorRef replyTo) {
this.snapshot = Preconditions.checkNotNull(snapshot);
+ this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream);
this.replyTo = Preconditions.checkNotNull(replyTo);
}
return snapshot;
}
+ Optional<OutputStream> getInstallSnapshotStream() {
+ return installSnapshotStream;
+ }
+
ActorRef getReplyTo() {
return replyTo;
}
@Override
protected void handleReceive(final Object message) throws Exception {
if (message instanceof SerializeSnapshot) {
- final SerializeSnapshot request = (SerializeSnapshot) message;
- request.getReplyTo().tell(new CaptureSnapshotReply(request.getSnapshot().serialize()), ActorRef.noSender());
+ onSerializeSnapshot((SerializeSnapshot) message);
} else {
unknownMessage(message);
}
}
- public static void requestSnapshot(final ActorRef snapshotActor, ShardDataTreeSnapshot snapshot,
- final ActorRef replyTo) {
- snapshotActor.tell(new SerializeSnapshot(snapshot, replyTo), ActorRef.noSender());
+ private void onSerializeSnapshot(final SerializeSnapshot request) {
+ Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
+ if (installSnapshotStream.isPresent()) {
+ try (ObjectOutputStream out = new ObjectOutputStream(installSnapshotStream.get())) {
+ request.getSnapshot().serialize(out);
+ } catch (IOException e) {
+ // TODO - we should communicate the failure in the CaptureSnapshotReply.
+ LOG.error("Error serializing snapshot", e);
+ }
+ }
+
+ request.getReplyTo().tell(new CaptureSnapshotReply(new ShardSnapshotState(request.getSnapshot()),
+ installSnapshotStream), ActorRef.noSender());
+ }
+
+ /**
+ * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
+ *
+ * @param snapshotActor the ShardSnapshotActor
+ * @param snapshot the snapshot to process
+ * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed
+ * on a follower.
+ * @param replyTo the actor to which to send the CaptureSnapshotReply
+ */
+ public static void requestSnapshot(final ActorRef snapshotActor, final ShardDataTreeSnapshot snapshot,
+ final Optional<OutputStream> installSnapshotStream, final ActorRef replyTo) {
+ snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
}
public static Props props() {