X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardSnapshotCohort.java;h=c7bc20f7546599876d4f99a9ca07fc174ed836ed;hp=126e28d9c8c6157794f3b7a12282fb5488d5e2dd;hb=3402cfce32b05957219e54754dd7ca5b0a54cd0e;hpb=3859df9beca8f13f1ff2b2744ed3470a1715bec3 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 126e28d9c8..c7bc20f754 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -24,6 +24,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; @@ -37,13 +38,15 @@ import org.slf4j.Logger; final class ShardSnapshotCohort implements RaftActorSnapshotCohort { private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply"); + private final InputOutputStreamFactory streamFactory; private final ActorRef snapshotActor; private final ShardDataTree store; private final String logId; private final Logger log; - private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor, - final ShardDataTree store, final Logger log, final String logId) { + ShardSnapshotCohort(final InputOutputStreamFactory streamFactory, final LocalHistoryIdentifier applyHistoryId, + final ActorRef snapshotActor, final ShardDataTree store, final Logger log, final String logId) { + this.streamFactory = requireNonNull(streamFactory); this.snapshotActor = requireNonNull(snapshotActor); this.store = requireNonNull(store); this.log = log; @@ -51,16 +54,19 @@ final class ShardSnapshotCohort implements RaftActorSnapshotCohort { } static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName, - final ShardDataTree store, final Logger log, final String logId) { + final ShardDataTree store, final Logger log, final String logId, final DatastoreContext context) { final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create( FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0); final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read"; + final InputOutputStreamFactory streamFactory = context.isUseLz4Compression() + ? InputOutputStreamFactory.lz4("256KB") : InputOutputStreamFactory.simple(); // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all // requests. - final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName); + final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(streamFactory), + snapshotActorName); - return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId); + return new ShardSnapshotCohort(streamFactory, applyHistoryId, snapshotActor, store, log, logId); } @Override @@ -99,7 +105,7 @@ final class ShardSnapshotCohort implements RaftActorSnapshotCohort { @Override public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException { - try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) { + try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(snapshotBytes))) { return ShardDataTreeSnapshot.deserialize(in); } }