Add optional lz4 compression for snapshots 92/90092/27
authortadei.bilan <tadei.bilan@pantheon.tech>
Thu, 25 Jun 2020 12:42:47 +0000 (15:42 +0300)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 26 Jul 2020 15:19:59 +0000 (17:19 +0200)
Added ability to use lz4 compression both for snapshots sent to
followers and snapshots in storage.

JIRA: CONTROLLER-1936
Change-Id: I073120efddde869b10999450057b91e75f0ffe07
Signed-off-by: tadei.bilan <tadei.bilan@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
14 files changed:
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/InputOutputStreamFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/LZ4InputOutputStreamSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/PlainInputOutputStreamSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java
opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/ShardSnapshotActorTest.java

index c3f03074ca380a78e800888823999065929ebe84..2e26ead94094c8a0a56470a3bf15f09a49d14f1f 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-codec-binfmt</artifactId>
     </dependency>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-codec-binfmt</artifactId>
     </dependency>
+
+    <!-- Compression -->
+    <dependency>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>
+      <version>1.7.1</version>
+    </dependency>
   </dependencies>
 
   <build>
   </dependencies>
 
   <build>
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/InputOutputStreamFactory.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/InputOutputStreamFactory.java
new file mode 100644 (file)
index 0000000..01e1098
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.Beta;
+import com.google.common.io.ByteSource;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import org.eclipse.jdt.annotation.NonNull;
+
+@Beta
+public abstract class InputOutputStreamFactory {
+    InputOutputStreamFactory() {
+        // Hidden on purpose
+    }
+
+    public static @NonNull InputOutputStreamFactory simple() {
+        return PlainInputOutputStreamSupport.INSTANCE;
+    }
+
+    public static @NonNull InputOutputStreamFactory lz4(final String blockSize) {
+        return lz4(LZ4FrameOutputStream.BLOCKSIZE.valueOf("SIZE_" + blockSize));
+    }
+
+    public static @NonNull InputOutputStreamFactory lz4(final LZ4FrameOutputStream.BLOCKSIZE blockSize) {
+        return new LZ4InputOutputStreamSupport(requireNonNull(blockSize));
+    }
+
+    public abstract @NonNull InputStream createInputStream(ByteSource input) throws IOException;
+
+    public abstract @NonNull InputStream createInputStream(File file) throws IOException;
+
+    public abstract @NonNull OutputStream createOutputStream(File file) throws IOException;
+
+    public abstract @NonNull OutputStream wrapOutputStream(OutputStream output) throws IOException;
+
+    static @NonNull BufferedInputStream defaultCreateInputStream(final File file) throws FileNotFoundException {
+        return new BufferedInputStream(new FileInputStream(file));
+    }
+
+    static @NonNull BufferedOutputStream defaultCreateOutputStream(final File file) throws FileNotFoundException {
+        return new BufferedOutputStream(new FileOutputStream(file));
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/LZ4InputOutputStreamSupport.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/LZ4InputOutputStreamSupport.java
new file mode 100644 (file)
index 0000000..689b585
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import com.google.common.io.ByteSource;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FrameInputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream.FLG.Bits;
+import net.jpountz.xxhash.XXHashFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class LZ4InputOutputStreamSupport extends InputOutputStreamFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(LZ4InputOutputStreamSupport.class);
+    private static final LZ4Factory LZ4_FACTORY = LZ4Factory.fastestInstance();
+    private static final XXHashFactory HASH_FACTORY = XXHashFactory.fastestInstance();
+
+    private final LZ4FrameOutputStream.BLOCKSIZE blocksize;
+
+    LZ4InputOutputStreamSupport(final LZ4FrameOutputStream.BLOCKSIZE blocksize) {
+        this.blocksize = blocksize;
+    }
+
+    @Override
+    public InputStream createInputStream(final ByteSource input) throws IOException {
+        final InputStream stream = input.openStream();
+        try {
+            return new LZ4FrameInputStream(stream, LZ4_FACTORY.safeDecompressor(), HASH_FACTORY.hash32());
+        } catch (IOException e) {
+            stream.close();
+            LOG.warn("Error loading with lz4 decompression, using default one", e);
+            return input.openBufferedStream();
+        }
+    }
+
+    @Override
+    public InputStream createInputStream(final File file) throws IOException {
+        final FileInputStream fileInput = new FileInputStream(file);
+        try {
+            return new LZ4FrameInputStream(fileInput, LZ4_FACTORY.safeDecompressor(), HASH_FACTORY.hash32());
+        } catch (IOException e) {
+            fileInput.close();
+            LOG.warn("Error loading file with lz4 decompression, using default one", e);
+            return defaultCreateInputStream(file);
+        }
+    }
+
+    @Override
+    public OutputStream createOutputStream(final File file) throws IOException {
+        return new LZ4FrameOutputStream(new FileOutputStream(file), blocksize, -1, LZ4_FACTORY.fastCompressor(),
+            HASH_FACTORY.hash32(), Bits.BLOCK_INDEPENDENCE);
+    }
+
+    @Override
+    public OutputStream wrapOutputStream(final OutputStream output) throws IOException {
+        return new LZ4FrameOutputStream(output, blocksize, -1, LZ4_FACTORY.fastCompressor(), HASH_FACTORY.hash32(),
+            Bits.BLOCK_INDEPENDENCE);
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/PlainInputOutputStreamSupport.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/PlainInputOutputStreamSupport.java
new file mode 100644 (file)
index 0000000..7287def
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import com.google.common.io.ByteSource;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.eclipse.jdt.annotation.NonNull;
+
+final class PlainInputOutputStreamSupport extends InputOutputStreamFactory {
+    static final @NonNull PlainInputOutputStreamSupport INSTANCE = new PlainInputOutputStreamSupport();
+
+    private PlainInputOutputStreamSupport() {
+        // Hidden on purpose
+    }
+
+    @Override
+    public InputStream createInputStream(final ByteSource input) throws IOException {
+        return input.openBufferedStream();
+    }
+
+    @Override
+    public InputStream createInputStream(final File file) throws IOException {
+        return defaultCreateInputStream(file);
+    }
+
+    @Override
+    public OutputStream createOutputStream(final File file) throws IOException {
+        return defaultCreateOutputStream(file);
+    }
+
+    @Override
+    public OutputStream wrapOutputStream(final OutputStream output) throws IOException {
+        return output;
+    }
+}
index 42c90b80d4df0ca70e6fbb95ba8f5be9b8cadb6b..695f34ecee5fe7647fc310ee6f6572512c6ae57f 100644 (file)
@@ -22,10 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteStreams;
 import com.typesafe.config.Config;
 import java.io.BufferedInputStream;
 import com.google.common.io.ByteStreams;
 import com.typesafe.config.Config;
 import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -48,6 +46,7 @@ import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.eclipse.jdt.annotation.Nullable;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
@@ -63,9 +62,9 @@ import scala.concurrent.Future;
  */
 public class LocalSnapshotStore extends SnapshotStore {
     private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
  */
 public class LocalSnapshotStore extends SnapshotStore {
     private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
-
     private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
 
     private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
 
+    private final InputOutputStreamFactory streamFactory;
     private final ExecutionContext executionContext;
     private final int maxLoadAttempts;
     private final File snapshotDir;
     private final ExecutionContext executionContext;
     private final int maxLoadAttempts;
     private final File snapshotDir;
@@ -74,9 +73,18 @@ public class LocalSnapshotStore extends SnapshotStore {
         this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
         snapshotDir = new File(config.getString("dir"));
 
         this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
         snapshotDir = new File(config.getString("dir"));
 
-        int localMaxLoadAttempts = config.getInt("max-load-attempts");
+        final int localMaxLoadAttempts = config.getInt("max-load-attempts");
         maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
 
         maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
 
+        if (config.getBoolean("use-lz4-compression")) {
+            final String size = config.getString("lz4-blocksize");
+            streamFactory = InputOutputStreamFactory.lz4(size);
+            LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size);
+        } else {
+            streamFactory = InputOutputStreamFactory.simple();
+            LOG.debug("Using plain Input/Output Stream");
+        }
+
         LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
     }
 
         LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
     }
 
@@ -140,7 +148,7 @@ public class LocalSnapshotStore extends SnapshotStore {
     private Object deserialize(final File file) throws IOException {
         return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
             (Callable<Object>) () -> {
     private Object deserialize(final File file) throws IOException {
         return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
             (Callable<Object>) () -> {
-                try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)))) {
+                try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(file))) {
                     return in.readObject();
                 } catch (ClassNotFoundException e) {
                     throw new IOException("Error loading snapshot file " + file, e);
                     return in.readObject();
                 } catch (ClassNotFoundException e) {
                     throw new IOException("Error loading snapshot file " + file, e);
@@ -178,7 +186,7 @@ public class LocalSnapshotStore extends SnapshotStore {
 
         LOG.debug("Saving to temp file: {}", temp);
 
 
         LOG.debug("Saving to temp file: {}", temp);
 
-        try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(temp)))) {
+        try (ObjectOutputStream out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) {
             out.writeObject(snapshot);
         } catch (IOException e) {
             LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
             out.writeObject(snapshot);
         } catch (IOException e) {
             LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
index 83b1ece6f21b56410da628c28fecebb3c682f50d..ece191671c7f7782acbe8cd54ec7380230eaa64a 100644 (file)
@@ -3,5 +3,6 @@ akka {
       snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore"
       snapshot-store.plugin = akka.persistence.snapshot-store.local
       snapshot-store.local.dir = "target/snapshots"
       snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore"
       snapshot-store.plugin = akka.persistence.snapshot-store.local
       snapshot-store.local.dir = "target/snapshots"
+      snapshot-store.local.use-lz4-compression = false
     }
 }
     }
 }
index 5cde9e4c46251c9ab87e49307b5989cda1fb016b..88443c00b395f2baa57495893c8535882c798411 100644 (file)
@@ -34,6 +34,11 @@ odl-cluster-data {
       # The relative path is always relative to KARAF_HOME.
 
       # snapshot-store.local.dir = "target/snapshots"
       # The relative path is always relative to KARAF_HOME.
 
       # snapshot-store.local.dir = "target/snapshots"
+
+      # Use lz4 compression for LocalSnapshotStore snapshots
+      snapshot-store.local.use-lz4-compression = false
+      # Size of blocks for lz4 compression: 64KB, 256KB, 1MB or 4MB
+      snapshot-store.local.lz4-blocksize = 256KB
     }
     disable-default-actor-system-quarantined-event-handling = "false"
   }
     }
     disable-default-actor-system-quarantined-event-handling = "false"
   }
index f1374b11d7481f93c10aab090d1ecc5c78bc7564..7d4903e17c9b88ebd9d7986902f1389f461e872f 100644 (file)
@@ -122,4 +122,7 @@ operational.persistent=false
 #recovery-snapshot-interval-seconds=0
 
 # Option to take a snapshot when the entire DataTree root or top-level container is overwritten
 #recovery-snapshot-interval-seconds=0
 
 # Option to take a snapshot when the entire DataTree root or top-level container is overwritten
-snapshot-on-root-overwrite=false
\ No newline at end of file
+snapshot-on-root-overwrite=false
+
+# Enable lz4 compression for snapshots sent from leader to followers
+#use-lz4-compression=true
index 43af39ff31182759f94744354cd3584ee7fbd75e..69131a0d1c19a238e0ba57df3786bad3b93caa65 100644 (file)
@@ -100,6 +100,7 @@ public class DatastoreContext implements ClientActorConfig {
     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
     private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
     private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
     private int initialPayloadSerializedBufferCapacity = DEFAULT_INITIAL_PAYLOAD_SERIALIZED_BUFFER_CAPACITY;
+    private boolean useLz4Compression = false;
 
     public static Set<String> getGlobalDatastoreNames() {
         return GLOBAL_DATASTORE_NAMES;
 
     public static Set<String> getGlobalDatastoreNames() {
         return GLOBAL_DATASTORE_NAMES;
@@ -144,6 +145,7 @@ public class DatastoreContext implements ClientActorConfig {
         this.requestTimeout = other.requestTimeout;
         this.noProgressTimeout = other.noProgressTimeout;
         this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
         this.requestTimeout = other.requestTimeout;
         this.noProgressTimeout = other.noProgressTimeout;
         this.initialPayloadSerializedBufferCapacity = other.initialPayloadSerializedBufferCapacity;
+        this.useLz4Compression = other.useLz4Compression;
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
@@ -352,6 +354,10 @@ public class DatastoreContext implements ClientActorConfig {
         return useTellBasedProtocol;
     }
 
         return useTellBasedProtocol;
     }
 
+    public boolean isUseLz4Compression() {
+        return useLz4Compression;
+    }
+
     @Override
     public int getMaximumMessageSliceSize() {
         return maximumMessageSliceSize;
     @Override
     public int getMaximumMessageSliceSize() {
         return maximumMessageSliceSize;
@@ -593,6 +599,11 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
             return this;
         }
 
+        public Builder useLz4Compression(final boolean value) {
+            datastoreContext.useLz4Compression = value;
+            return this;
+        }
+
         /**
          * For unit tests only.
          */
         /**
          * For unit tests only.
          */
index e051953f9581bea48c197b7774118ae534c2969d..eeacdd9b6ffa9cb247c698d64c68936f8e594caa 100644 (file)
@@ -253,7 +253,7 @@ public class Shard extends RaftActor {
                 self(), getContext(), shardMBean, builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
                 self(), getContext(), shardMBean, builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
-            this.name);
+            this.name, datastoreContext);
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
 
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
 
index 126e28d9c8c6157794f3b7a12282fb5488d5e2dd..c7bc20f7546599876d4f99a9ca07fc174ed836ed 100644 (file)
@@ -24,6 +24,7 @@ import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.actors.ShardSnapshotActor;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
@@ -37,13 +38,15 @@ import org.slf4j.Logger;
 final class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
 
 final class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     private static final FrontendType SNAPSHOT_APPLY = FrontendType.forName("snapshot-apply");
 
+    private final InputOutputStreamFactory streamFactory;
     private final ActorRef snapshotActor;
     private final ShardDataTree store;
     private final String logId;
     private final Logger log;
 
     private final ActorRef snapshotActor;
     private final ShardDataTree store;
     private final String logId;
     private final Logger log;
 
-    private ShardSnapshotCohort(final LocalHistoryIdentifier applyHistoryId, final ActorRef snapshotActor,
-            final ShardDataTree store, final Logger log, final String logId) {
+    ShardSnapshotCohort(final InputOutputStreamFactory streamFactory, final LocalHistoryIdentifier applyHistoryId,
+            final ActorRef snapshotActor, final ShardDataTree store, final Logger log, final String logId) {
+        this.streamFactory = requireNonNull(streamFactory);
         this.snapshotActor = requireNonNull(snapshotActor);
         this.store = requireNonNull(store);
         this.log = log;
         this.snapshotActor = requireNonNull(snapshotActor);
         this.store = requireNonNull(store);
         this.log = log;
@@ -51,16 +54,19 @@ final class ShardSnapshotCohort implements RaftActorSnapshotCohort {
     }
 
     static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName,
     }
 
     static ShardSnapshotCohort create(final ActorContext actorContext, final MemberName memberName,
-            final ShardDataTree store, final Logger log, final String logId) {
+            final ShardDataTree store, final Logger log, final String logId, final DatastoreContext context) {
         final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
             FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
         final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read";
 
         final LocalHistoryIdentifier applyHistoryId = new LocalHistoryIdentifier(ClientIdentifier.create(
             FrontendIdentifier.create(memberName, SNAPSHOT_APPLY), 0), 0);
         final String snapshotActorName = "shard-" + memberName.getName() + ':' + "snapshot-read";
 
+        final InputOutputStreamFactory streamFactory = context.isUseLz4Compression()
+                ? InputOutputStreamFactory.lz4("256KB") : InputOutputStreamFactory.simple();
         // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all
         // requests.
         // Create a snapshot actor. This actor will act as a worker to offload snapshot serialization for all
         // requests.
-        final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(), snapshotActorName);
+        final ActorRef snapshotActor = actorContext.actorOf(ShardSnapshotActor.props(streamFactory),
+                snapshotActorName);
 
 
-        return new ShardSnapshotCohort(applyHistoryId, snapshotActor, store, log, logId);
+        return new ShardSnapshotCohort(streamFactory, applyHistoryId, snapshotActor, store, log, logId);
     }
 
     @Override
     }
 
     @Override
@@ -99,7 +105,7 @@ final class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
     @Override
     public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
 
     @Override
     public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
-        try (ObjectInputStream in = new ObjectInputStream(snapshotBytes.openStream())) {
+        try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(snapshotBytes))) {
             return ShardDataTreeSnapshot.deserialize(in);
         }
     }
             return ShardDataTreeSnapshot.deserialize(in);
         }
     }
index bdc2ec48d7ba12bbe373f25f8db096c7e8926ef1..d7d380830fd81b34502fe606ce3ca1e63da1113a 100644 (file)
@@ -18,6 +18,7 @@ import java.util.Optional;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 
 /**
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 
 /**
@@ -56,8 +57,11 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
     //actor name override used for metering. This does not change the "real" actor name
     private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot";
 
     //actor name override used for metering. This does not change the "real" actor name
     private static final String ACTOR_NAME_FOR_METERING = "shard-snapshot";
 
-    private ShardSnapshotActor() {
+    private final InputOutputStreamFactory streamFactory;
+
+    private ShardSnapshotActor(final InputOutputStreamFactory streamFactory) {
         super(ACTOR_NAME_FOR_METERING);
         super(ACTOR_NAME_FOR_METERING);
+        this.streamFactory = requireNonNull(streamFactory);
     }
 
     @Override
     }
 
     @Override
@@ -72,7 +76,7 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
     private void onSerializeSnapshot(final SerializeSnapshot request) {
         Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
         if (installSnapshotStream.isPresent()) {
     private void onSerializeSnapshot(final SerializeSnapshot request) {
         Optional<OutputStream> installSnapshotStream = request.getInstallSnapshotStream();
         if (installSnapshotStream.isPresent()) {
-            try (ObjectOutputStream out = new ObjectOutputStream(installSnapshotStream.get())) {
+            try (ObjectOutputStream out = getOutputStream(installSnapshotStream.get())) {
                 request.getSnapshot().serialize(out);
             } catch (IOException e) {
                 // TODO - we should communicate the failure in the CaptureSnapshotReply.
                 request.getSnapshot().serialize(out);
             } catch (IOException e) {
                 // TODO - we should communicate the failure in the CaptureSnapshotReply.
@@ -84,6 +88,10 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
                 installSnapshotStream), ActorRef.noSender());
     }
 
                 installSnapshotStream), ActorRef.noSender());
     }
 
+    private ObjectOutputStream getOutputStream(final OutputStream outputStream) throws IOException {
+        return new ObjectOutputStream(streamFactory.wrapOutputStream(outputStream));
+    }
+
     /**
      * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
      *
     /**
      * Sends a request to a ShardSnapshotActor to process a snapshot and send a CaptureSnapshotReply.
      *
@@ -98,7 +106,7 @@ public final class ShardSnapshotActor extends AbstractUntypedActorWithMetering {
         snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
     }
 
         snapshotActor.tell(new SerializeSnapshot(snapshot, installSnapshotStream, replyTo), ActorRef.noSender());
     }
 
-    public static Props props() {
-        return Props.create(ShardSnapshotActor.class);
+    public static Props props(final InputOutputStreamFactory streamFactory) {
+        return Props.create(ShardSnapshotActor.class, streamFactory);
     }
 }
     }
 }
index acb22256ce26f88cf8a57b61ca7857703737380c..f0d6693f5815d5f15016f1bbcfa87b7675b83ae6 100644 (file)
@@ -286,6 +286,13 @@ module distributed-datastore-provider {
             type non-zero-uint32-type;
             description "The initial buffer capacity, in bytes, to use when serializing message payloads.";
         }
             type non-zero-uint32-type;
             description "The initial buffer capacity, in bytes, to use when serializing message payloads.";
         }
+
+        leaf use-lz4-compression {
+            default false;
+            type boolean;
+            description "Use lz4 compression for snapshots, sent from leader to follower, for snapshots stored
+                        by LocalSnapshotStore, use akka.conf configuration.";
+        }
     }
 
     container data-store-properties-container {
     }
 
     container data-store-properties-container {
index 6a0def0901ac70d413af19e201793fb4311d75b8..ae7c9b496341696c762501a47d9f86b6b3b647da 100644 (file)
@@ -13,7 +13,7 @@ import static org.junit.Assert.assertTrue;
 
 import akka.actor.ActorRef;
 import akka.testkit.javadsl.TestKit;
 
 import akka.actor.ActorRef;
 import akka.testkit.javadsl.TestKit;
-import java.io.ByteArrayInputStream;
+import com.google.common.io.ByteSource;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.time.Duration;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.time.Duration;
@@ -23,18 +23,21 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 public class ShardSnapshotActorTest extends AbstractActorTest {
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 public class ShardSnapshotActorTest extends AbstractActorTest {
+    private static final InputOutputStreamFactory STREAM_FACTORY = InputOutputStreamFactory.simple();
+
     private static final NormalizedNode<?, ?> DATA = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
     private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot,
             final boolean withInstallSnapshot) throws Exception {
         final TestKit kit = new TestKit(getSystem());
     private static final NormalizedNode<?, ?> DATA = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
     private static void testSerializeSnapshot(final String testName, final ShardDataTreeSnapshot snapshot,
             final boolean withInstallSnapshot) throws Exception {
         final TestKit kit = new TestKit(getSystem());
-        final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(), testName);
+        final ActorRef snapshotActor = getSystem().actorOf(ShardSnapshotActor.props(STREAM_FACTORY), testName);
         kit.watch(snapshotActor);
 
         final NormalizedNode<?, ?> expectedRoot = snapshot.getRootNode().get();
         kit.watch(snapshotActor);
 
         final NormalizedNode<?, ?> expectedRoot = snapshot.getRootNode().get();
@@ -50,8 +53,8 @@ public class ShardSnapshotActorTest extends AbstractActorTest {
 
         if (installSnapshotStream != null) {
             final ShardDataTreeSnapshot deserialized;
 
         if (installSnapshotStream != null) {
             final ShardDataTreeSnapshot deserialized;
-            try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(
-                installSnapshotStream.toByteArray()))) {
+            try (ObjectInputStream in = new ObjectInputStream(STREAM_FACTORY.createInputStream(
+                    ByteSource.wrap(installSnapshotStream.toByteArray())))) {
                 deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
             }
 
                 deserialized = ShardDataTreeSnapshot.deserialize(in).getSnapshot();
             }