Add optional lz4 compression for snapshots
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / persistence / LocalSnapshotStore.java
index 42c90b80d4df0ca70e6fbb95ba8f5be9b8cadb6b..695f34ecee5fe7647fc310ee6f6572512c6ae57f 100644 (file)
@@ -22,10 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteStreams;
 import com.typesafe.config.Config;
 import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -48,6 +46,7 @@ import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
@@ -63,9 +62,9 @@ import scala.concurrent.Future;
  */
 public class LocalSnapshotStore extends SnapshotStore {
     private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
-
     private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
 
+    private final InputOutputStreamFactory streamFactory;
     private final ExecutionContext executionContext;
     private final int maxLoadAttempts;
     private final File snapshotDir;
@@ -74,9 +73,18 @@ public class LocalSnapshotStore extends SnapshotStore {
         this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
         snapshotDir = new File(config.getString("dir"));
 
-        int localMaxLoadAttempts = config.getInt("max-load-attempts");
+        final int localMaxLoadAttempts = config.getInt("max-load-attempts");
         maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
 
+        if (config.getBoolean("use-lz4-compression")) {
+            final String size = config.getString("lz4-blocksize");
+            streamFactory = InputOutputStreamFactory.lz4(size);
+            LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size);
+        } else {
+            streamFactory = InputOutputStreamFactory.simple();
+            LOG.debug("Using plain Input/Output Stream");
+        }
+
         LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
     }
 
@@ -140,7 +148,7 @@ public class LocalSnapshotStore extends SnapshotStore {
     private Object deserialize(final File file) throws IOException {
         return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
             (Callable<Object>) () -> {
-                try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)))) {
+                try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(file))) {
                     return in.readObject();
                 } catch (ClassNotFoundException e) {
                     throw new IOException("Error loading snapshot file " + file, e);
@@ -178,7 +186,7 @@ public class LocalSnapshotStore extends SnapshotStore {
 
         LOG.debug("Saving to temp file: {}", temp);
 
-        try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(temp)))) {
+        try (ObjectOutputStream out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) {
             out.writeObject(snapshot);
         } catch (IOException e) {
             LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);