From: tadei.bilan Date: Thu, 25 Jun 2020 12:42:47 +0000 (+0300) Subject: Add optional lz4 compression for snapshots X-Git-Tag: v2.0.3~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=e7e69069ae5ecaacc9ea0e47cb40cdf68237d636 Add optional lz4 compression for snapshots Added ability to use lz4 compression both for snapshots sent to followers and snapshots in storage. JIRA: CONTROLLER-1936 Change-Id: I073120efddde869b10999450057b91e75f0ffe07 Signed-off-by: tadei.bilan Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-clustering-commons/pom.xml b/opendaylight/md-sal/sal-clustering-commons/pom.xml index c3f03074ca..2e26ead940 100644 --- a/opendaylight/md-sal/sal-clustering-commons/pom.xml +++ b/opendaylight/md-sal/sal-clustering-commons/pom.xml @@ -156,6 +156,13 @@ org.opendaylight.yangtools yang-data-codec-binfmt + + + + org.lz4 + lz4-java + 1.7.1 + diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/InputOutputStreamFactory.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/InputOutputStreamFactory.java new file mode 100644 index 0000000000..01e1098bb0 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/InputOutputStreamFactory.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.io; + +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.Beta; +import com.google.common.io.ByteSource; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import net.jpountz.lz4.LZ4FrameOutputStream; +import org.eclipse.jdt.annotation.NonNull; + +@Beta +public abstract class InputOutputStreamFactory { + InputOutputStreamFactory() { + // Hidden on purpose + } + + public static @NonNull InputOutputStreamFactory simple() { + return PlainInputOutputStreamSupport.INSTANCE; + } + + public static @NonNull InputOutputStreamFactory lz4(final String blockSize) { + return lz4(LZ4FrameOutputStream.BLOCKSIZE.valueOf("SIZE_" + blockSize)); + } + + public static @NonNull InputOutputStreamFactory lz4(final LZ4FrameOutputStream.BLOCKSIZE blockSize) { + return new LZ4InputOutputStreamSupport(requireNonNull(blockSize)); + } + + public abstract @NonNull InputStream createInputStream(ByteSource input) throws IOException; + + public abstract @NonNull InputStream createInputStream(File file) throws IOException; + + public abstract @NonNull OutputStream createOutputStream(File file) throws IOException; + + public abstract @NonNull OutputStream wrapOutputStream(OutputStream output) throws IOException; + + static @NonNull BufferedInputStream defaultCreateInputStream(final File file) throws FileNotFoundException { + return new BufferedInputStream(new FileInputStream(file)); + } + + static @NonNull BufferedOutputStream defaultCreateOutputStream(final File file) throws FileNotFoundException { + return new BufferedOutputStream(new FileOutputStream(file)); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/LZ4InputOutputStreamSupport.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/LZ4InputOutputStreamSupport.java new file mode 100644 index 0000000000..689b58564b --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/LZ4InputOutputStreamSupport.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.io; + +import com.google.common.io.ByteSource; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FrameInputStream; +import net.jpountz.lz4.LZ4FrameOutputStream; +import net.jpountz.lz4.LZ4FrameOutputStream.FLG.Bits; +import net.jpountz.xxhash.XXHashFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class LZ4InputOutputStreamSupport extends InputOutputStreamFactory { + private static final Logger LOG = LoggerFactory.getLogger(LZ4InputOutputStreamSupport.class); + private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance(); + private static final XXHashFactory HASH_FACTORY = XXHashFactory.fastestInstance(); + + private final LZ4FrameOutputStream.BLOCKSIZE blocksize; + + LZ4InputOutputStreamSupport(final LZ4FrameOutputStream.BLOCKSIZE blocksize) { + this.blocksize = blocksize; + } + + @Override + public InputStream createInputStream(final ByteSource input) throws IOException { + final InputStream stream = input.openStream(); + try { + return new LZ4FrameInputStream(stream, LZ4_FACTORY.safeDecompressor(), HASH_FACTORY.hash32()); + } catch (IOException e) { + stream.close(); + LOG.warn("Error loading with lz4 decompression, using default one", e); + return input.openBufferedStream(); + } + } + + @Override + public InputStream createInputStream(final File file) throws IOException { + final FileInputStream fileInput = new FileInputStream(file); + try { + return new LZ4FrameInputStream(fileInput, LZ4_FACTORY.safeDecompressor(), HASH_FACTORY.hash32()); + } catch (IOException e) { + fileInput.close(); + LOG.warn("Error loading file with lz4 decompression, using default one", e); + return defaultCreateInputStream(file); + } + } + + @Override + public OutputStream createOutputStream(final File file) throws IOException { + return new LZ4FrameOutputStream(new FileOutputStream(file), blocksize, -1, LZ4_FACTORY.fastCompressor(), + HASH_FACTORY.hash32(), Bits.BLOCK_INDEPENDENCE); + } + + @Override + public OutputStream wrapOutputStream(final OutputStream output) throws IOException { + return new LZ4FrameOutputStream(output, blocksize, -1, LZ4_FACTORY.fastCompressor(), HASH_FACTORY.hash32(), + Bits.BLOCK_INDEPENDENCE); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/PlainInputOutputStreamSupport.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/PlainInputOutputStreamSupport.java new file mode 100644 index 0000000000..7287def84c --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/PlainInputOutputStreamSupport.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.io; + +import com.google.common.io.ByteSource; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.eclipse.jdt.annotation.NonNull; + +final class PlainInputOutputStreamSupport extends InputOutputStreamFactory { + static final @NonNull PlainInputOutputStreamSupport INSTANCE = new PlainInputOutputStreamSupport(); + + private PlainInputOutputStreamSupport() { + // Hidden on purpose + } + + @Override + public InputStream createInputStream(final ByteSource input) throws IOException { + return input.openBufferedStream(); + } + + @Override + public InputStream createInputStream(final File file) throws IOException { + return defaultCreateInputStream(file); + } + + @Override + public OutputStream createOutputStream(final File file) throws IOException { + return defaultCreateOutputStream(file); + } + + @Override + public OutputStream wrapOutputStream(final OutputStream output) throws IOException { + return output; + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java index 42c90b80d4..695f34ecee 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java @@ -22,10 +22,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.typesafe.config.Config; import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; @@ -48,6 +46,7 @@ import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; @@ -63,9 +62,9 @@ import scala.concurrent.Future; */ public class LocalSnapshotStore extends SnapshotStore { private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class); - private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length(); + private final InputOutputStreamFactory streamFactory; private final ExecutionContext executionContext; private final int maxLoadAttempts; private final File snapshotDir; @@ -74,9 +73,18 @@ public class LocalSnapshotStore extends SnapshotStore { this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher")); snapshotDir = new File(config.getString("dir")); - int localMaxLoadAttempts = config.getInt("max-load-attempts"); + final int localMaxLoadAttempts = config.getInt("max-load-attempts"); maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1; + if (config.getBoolean("use-lz4-compression")) { + final String size = config.getString("lz4-blocksize"); + streamFactory = InputOutputStreamFactory.lz4(size); + LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size); + } else { + streamFactory = InputOutputStreamFactory.simple(); + LOG.debug("Using plain Input/Output Stream"); + } + LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts); } @@ -140,7 +148,7 @@ public class LocalSnapshotStore extends SnapshotStore { private Object deserialize(final File file) throws IOException { return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(), (Callable) () -> { - try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)))) { + try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(file))) { return in.readObject(); } catch (ClassNotFoundException e) { throw new IOException("Error loading snapshot file " + file, e); @@ -178,7 +186,7 @@ public class LocalSnapshotStore extends SnapshotStore { LOG.debug("Saving to temp file: {}", temp); - try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(temp)))) { + try (ObjectOutputStream out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) { out.writeObject(snapshot); } catch (IOException e) { LOG.error("Error saving snapshot file {}. Deleting file..", temp, e); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf index 83b1ece6f2..ece191671c 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf @@ -3,5 +3,6 @@ akka { snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore" snapshot-store.plugin = akka.persistence.snapshot-store.local snapshot-store.local.dir = "target/snapshots" + snapshot-store.local.use-lz4-compression = false } } diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf index 5cde9e4c46..88443c00b3 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf @@ -34,6 +34,11 @@ odl-cluster-data { # The relative path is always relative to KARAF_HOME. # snapshot-store.local.dir = "target/snapshots" + + # Use lz4 compression for LocalSnapshotStore snapshots + snapshot-store.local.use-lz4-compression = false + # Size of blocks for lz4 compression: 64KB, 256KB, 1MB or 4MB + snapshot-store.local.lz4-blocksize = 256KB } disable-default-actor-system-quarantined-event-handling = "false" } diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg index f1374b11d7..7d4903e17c 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg @@ -122,4 +122,7 @@ operational.persistent=false #recovery-snapshot-interval-seconds=0 # Option to take a snapshot when the entire DataTree root or top-level container is overwritten -snapshot-on-root-overwrite=false \ No newline at end of file +snapshot-on-root-overwrite=false + +# Enable lz4 compression for snapshots sent from leader to followers +#use-lz4-compression=true diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 43af39ff31..69131a0d1c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -100,6 +100,7 @@ public class DatastoreContext implements ClientActorConfig { private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS; private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS; private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY; + private boolean useLz4Compression = false; public static Set getGlobalDatastoreNames() { return GLOBAL_DATASTORE_NAMES; @@ -144,6 +145,7 @@ public class DatastoreContext implements ClientActorConfig { this.requestTimeout = other.requestTimeout; this.noProgressTimeout = other.noProgressTimeout; this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity; + this.useLz4Compression = other.useLz4Compression; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -352,6 +354,10 @@ public class DatastoreContext implements ClientActorConfig { return useTellBasedProtocol; } + public boolean isUseLz4Compression() { + return useLz4Compression; + } + @Override public int getMaximumMessageSliceSize() { return maximumMessageSliceSize; @@ -593,6 +599,11 @@ public class DatastoreContext implements ClientActorConfig { return this; } + public Builder useLz4Compression(final boolean value) { + datastoreContext.useLz4Compression = value; + return this; + } + /** * For unit tests only. */ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index e051953f95..eeacdd9b6f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -253,7 +253,7 @@ public class Shard extends RaftActor { self(), getContext(), shardMBean, builder.getId().getShardName()); snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG, - this.name); + this.name, datastoreContext); messageRetrySupport = new ShardTransactionMessageRetrySupport(this); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 126e28d9c8..c7bc20f754 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -24,6 +24,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State; @@ -37,13 +38,15 @@ import org.slf4j.Logger; final class ShardSnapshotCohort implements RaftActorSnapshotCohort { private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply"); + private final InputOutputStreamFactory streamFactory; private final ActorRef snapshotActor; private final ShardDataTree store; private final String logId; private final Logger log; - private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor, - final ShardDataTree store, final Logger log, final String logId) { + ShardSnapshotCohort(final InputOutputStreamFactory streamFactory, final LocalHistoryIdentifier applyHistoryId, + final ActorRef snapshotActor, final ShardDataTree store, final Logger log, final String logId) { + this.streamFactory = requireNonNull(streamFactory); this.snapshotActor = requireNonNull(snapshotActor); this.store = requireNonNull(store); this.log = log; @@ -51,16 +54,19 @@ final class ShardSnapshotCohort implements RaftActorSnapshotCohort { } static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName, - final ShardDataTree store, final Logger log, final String logId) { + final ShardDataTree store, final Logger log, final String logId, final DatastoreContext context) { final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create( FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0); final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read"; + final InputOutputStreamFactory streamFactory = context.isUseLz4Compression() + ? InputOutputStreamFactory.lz4("256KB") : InputOutputStreamFactory.simple(); // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all // requests. - final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName); + final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(streamFactory), + snapshotActorName); - return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId); + return new ShardSnapshotCohort(streamFactory, applyHistoryId, snapshotActor, store, log, logId); } @Override @@ -99,7 +105,7 @@ final class ShardSnapshotCohort implements RaftActorSnapshotCohort { @Override public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException { - try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) { + try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(snapshotBytes))) { return ShardDataTreeSnapshot.deserialize(in); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java index bdc2ec48d7..d7d380830f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java @@ -18,6 +18,7 @@ import java.util.Optional; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; /** @@ -56,8 +57,11 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { //actor name override used for metering. This does not change the "real" actor name private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot"; - private ShardSnapshotActor() { + private final InputOutputStreamFactory streamFactory; + + private ShardSnapshotActor(final InputOutputStreamFactory streamFactory) { super(ACTOR_NAME_FOR_METERING); + this.streamFactory = requireNonNull(streamFactory); } @Override @@ -72,7 +76,7 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { private void onSerializeSnapshot(final SerializeSnapshot request) { Optional installSnapshotStream = request.getInstallSnapshotStream(); if (installSnapshotStream.isPresent()) { - try (ObjectOutputStream out = new ObjectOutputStream(installSnapshotStream.get())) { + try (ObjectOutputStream out = getOutputStream(installSnapshotStream.get())) { request.getSnapshot().serialize(out); } catch (IOException e) { // TODO - we should communicate the failure in the CaptureSnapshotReply. @@ -84,6 +88,10 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { installSnapshotStream), ActorRef.noSender()); } + private ObjectOutputStream getOutputStream(final OutputStream outputStream) throws IOException { + return new ObjectOutputStream(streamFactory.wrapOutputStream(outputStream)); + } + /** * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply. * @@ -98,7 +106,7 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering { snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender()); } - public static Props props() { - return Props.create(ShardSnapshotActor.class); + public static Props props(final InputOutputStreamFactory streamFactory) { + return Props.create(ShardSnapshotActor.class, streamFactory); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang index acb22256ce..f0d6693f58 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang @@ -286,6 +286,13 @@ module distributed-datastore-provider { type non-zero-uint32-type; description "The initial buffer capacity, in bytes, to use when serializing message payloads."; } + + leaf use-lz4-compression { + default false; + type boolean; + description "Use lz4 compression for snapshots, sent from leader to follower, for snapshots stored + by LocalSnapshotStore, use akka.conf configuration."; + } } container data-store-properties-container { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java index 6a0def0901..ae7c9b4963 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java @@ -13,7 +13,7 @@ import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.testkit.javadsl.TestKit; -import java.io.ByteArrayInputStream; +import com.google.common.io.ByteSource; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.time.Duration; @@ -23,18 +23,21 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; public class ShardSnapshotActorTest extends AbstractActorTest { + private static final InputOutputStreamFactory STREAM_FACTORY = InputOutputStreamFactory.simple(); + private static final NormalizedNode DATA = ImmutableNodes.containerNode(TestModel.TEST_QNAME); private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot, final boolean withInstallSnapshot) throws Exception { final TestKit kit = new TestKit(getSystem()); - final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(), testName); + final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(STREAM_FACTORY), testName); kit.watch(snapshotActor); final NormalizedNode expectedRoot = snapshot.getRootNode().get(); @@ -50,8 +53,8 @@ public class ShardSnapshotActorTest extends AbstractActorTest { if (installSnapshotStream != null) { final ShardDataTreeSnapshot deserialized; - try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream( - installSnapshotStream.toByteArray()))) { + try (ObjectInputStream in = new ObjectInputStream(STREAM_FACTORY.createInputStream( + ByteSource.wrap(installSnapshotStream.toByteArray())))) { deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot(); }