*/
package org.opendaylight.controller.cluster.datastore.actors;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.Props;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Optional;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
/**
// Internal message
private static final class SerializeSnapshot {
private final ShardDataTreeSnapshot snapshot;
+ private final Optional<OutputStream> installSnapshotStream;
private final ActorRef replyTo;
- SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final ActorRef replyTo) {
- this.snapshot = Preconditions.checkNotNull(snapshot);
- this.replyTo = Preconditions.checkNotNull(replyTo);
+ SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional<OutputStream> installSnapshotStream,
+ final ActorRef replyTo) {
+ this.snapshot = requireNonNull(snapshot);
+ this.installSnapshotStream = requireNonNull(installSnapshotStream);
+ this.replyTo = requireNonNull(replyTo);
}
ShardDataTreeSnapshot getSnapshot() {
return snapshot;
}
+ Optional<OutputStream> getInstallSnapshotStream() {
+ return installSnapshotStream;
+ }
+
ActorRef getReplyTo() {
return replyTo;
}
//actor name override used for metering. This does not change the "real" actor name
private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot";
- private ShardSnapshotActor() {
+ private final InputOutputStreamFactory streamFactory;
+
+ private ShardSnapshotActor(final InputOutputStreamFactory streamFactory) {
super(ACTOR_NAME_FOR_METERING);
+ this.streamFactory = requireNonNull(streamFactory);
}
@Override
- protected void handleReceive(final Object message) throws Exception {
+ protected void handleReceive(final Object message) {
if (message instanceof SerializeSnapshot) {
- final SerializeSnapshot request = (SerializeSnapshot) message;
- request.getReplyTo().tell(new CaptureSnapshotReply(request.getSnapshot().serialize()), ActorRef.noSender());
+ onSerializeSnapshot((SerializeSnapshot) message);
} else {
unknownMessage(message);
}
}
- public static void requestSnapshot(final ActorRef snapshotActor, ShardDataTreeSnapshot snapshot,
- final ActorRef replyTo) {
- snapshotActor.tell(new SerializeSnapshot(snapshot, replyTo), ActorRef.noSender());
+ private void onSerializeSnapshot(final SerializeSnapshot request) {
+ Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
+ if (installSnapshotStream.isPresent()) {
+ try (ObjectOutputStream out = getOutputStream(installSnapshotStream.get())) {
+ request.getSnapshot().serialize(out);
+ } catch (IOException e) {
+ // TODO - we should communicate the failure in the CaptureSnapshotReply.
+ LOG.error("Error serializing snapshot", e);
+ }
+ }
+
+ request.getReplyTo().tell(new CaptureSnapshotReply(new ShardSnapshotState(request.getSnapshot()),
+ installSnapshotStream), ActorRef.noSender());
+ }
+
+ private ObjectOutputStream getOutputStream(final OutputStream outputStream) throws IOException {
+ return new ObjectOutputStream(streamFactory.wrapOutputStream(outputStream));
+ }
+
+ /**
+ * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
+ *
+ * @param snapshotActor the ShardSnapshotActor
+ * @param snapshot the snapshot to process
+ * @param installSnapshotStream Optional OutputStream that is present if the snapshot is to also be installed
+ * on a follower.
+ * @param replyTo the actor to which to send the CaptureSnapshotReply
+ */
+ public static void requestSnapshot(final ActorRef snapshotActor, final ShardDataTreeSnapshot snapshot,
+ final Optional<OutputStream> installSnapshotStream, final ActorRef replyTo) {
+ snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
}
- public static Props props() {
- return Props.create(ShardSnapshotActor.class);
+ public static Props props(final InputOutputStreamFactory streamFactory) {
+ return Props.create(ShardSnapshotActor.class, streamFactory);
}
}