*/
package org.opendaylight.controller.cluster.datastore.actors;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.Props;
-import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
/**
SerializeSnapshot(final ShardDataTreeSnapshot snapshot, final Optional<OutputStream> installSnapshotStream,
final ActorRef replyTo) {
- this.snapshot = Preconditions.checkNotNull(snapshot);
- this.installSnapshotStream = Preconditions.checkNotNull(installSnapshotStream);
- this.replyTo = Preconditions.checkNotNull(replyTo);
+ this.snapshot = requireNonNull(snapshot);
+ this.installSnapshotStream = requireNonNull(installSnapshotStream);
+ this.replyTo = requireNonNull(replyTo);
}
ShardDataTreeSnapshot getSnapshot() {
//actor name override used for metering. This does not change the "real" actor name
private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot";
- private ShardSnapshotActor() {
+ private final InputOutputStreamFactory streamFactory;
+
+ private ShardSnapshotActor(final InputOutputStreamFactory streamFactory) {
super(ACTOR_NAME_FOR_METERING);
+ this.streamFactory = requireNonNull(streamFactory);
}
@Override
private void onSerializeSnapshot(final SerializeSnapshot request) {
Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
if (installSnapshotStream.isPresent()) {
- try (ObjectOutputStream out = new ObjectOutputStream(installSnapshotStream.get())) {
+ try (ObjectOutputStream out = getOutputStream(installSnapshotStream.get())) {
request.getSnapshot().serialize(out);
} catch (IOException e) {
// TODO - we should communicate the failure in the CaptureSnapshotReply.
installSnapshotStream), ActorRef.noSender());
}
+ private ObjectOutputStream getOutputStream(final OutputStream outputStream) throws IOException {
+ return new ObjectOutputStream(streamFactory.wrapOutputStream(outputStream));
+ }
+
/**
* Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
*
snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
}
- public static Props props() {
- return Props.create(ShardSnapshotActor.class);
+ public static Props props(final InputOutputStreamFactory streamFactory) {
+ return Props.create(ShardSnapshotActor.class, streamFactory);
}
}