Add optional lz4 compression for snapshots
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / persistence / LocalSnapshotStore.java
index c89d7ffd45c062b245e6276ae9d9f02658350df4..695f34ecee5fe7647fc310ee6f6572512c6ae57f 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.persistence;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import akka.actor.ExtendedActorSystem;
 import akka.dispatch.Futures;
 import akka.persistence.SelectedSnapshot;
@@ -15,14 +17,13 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.serialization.Snapshot;
 import akka.persistence.serialization.SnapshotSerializer;
 import akka.persistence.snapshot.japi.SnapshotStore;
+import akka.serialization.JavaSerializer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteStreams;
 import com.typesafe.config.Config;
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -40,10 +41,12 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.Callable;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
@@ -59,9 +62,9 @@ import scala.concurrent.Future;
  */
 public class LocalSnapshotStore extends SnapshotStore {
     private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
-
     private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
 
+    private final InputOutputStreamFactory streamFactory;
     private final ExecutionContext executionContext;
     private final int maxLoadAttempts;
     private final File snapshotDir;
@@ -70,9 +73,18 @@ public class LocalSnapshotStore extends SnapshotStore {
         this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
         snapshotDir = new File(config.getString("dir"));
 
-        int localMaxLoadAttempts = config.getInt("max-load-attempts");
+        final int localMaxLoadAttempts = config.getInt("max-load-attempts");
         maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
 
+        if (config.getBoolean("use-lz4-compression")) {
+            final String size = config.getString("lz4-blocksize");
+            streamFactory = InputOutputStreamFactory.lz4(size);
+            LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size);
+        } else {
+            streamFactory = InputOutputStreamFactory.simple();
+            LOG.debug("Using plain Input/Output Stream");
+        }
+
         LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
     }
 
@@ -134,15 +146,17 @@ public class LocalSnapshotStore extends SnapshotStore {
     }
 
     private Object deserialize(final File file) throws IOException {
-        try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) {
-            return in.readObject();
-        } catch (ClassNotFoundException e) {
-            throw new IOException("Error loading snapshot file " + file, e);
-        } catch (IOException e) {
-            LOG.debug("Error loading snapshot file {}", file, e);
-
-            return tryDeserializeAkkaSnapshot(file);
-        }
+        return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
+            (Callable<Object>) () -> {
+                try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(file))) {
+                    return in.readObject();
+                } catch (ClassNotFoundException e) {
+                    throw new IOException("Error loading snapshot file " + file, e);
+                } catch (IOException e) {
+                    LOG.debug("Error loading snapshot file {}", file, e);
+                    return tryDeserializeAkkaSnapshot(file);
+                }
+            });
     }
 
     private Object tryDeserializeAkkaSnapshot(final File file) throws IOException {
@@ -172,7 +186,7 @@ public class LocalSnapshotStore extends SnapshotStore {
 
         LOG.debug("Saving to temp file: {}", temp);
 
-        try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(temp))) {
+        try (ObjectOutputStream out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) {
             out.writeObject(snapshot);
         } catch (IOException e) {
             LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);
@@ -220,7 +234,13 @@ public class LocalSnapshotStore extends SnapshotStore {
 
         LOG.debug("Deleting files: {}", files);
 
-        files.forEach(File::delete);
+        files.forEach(file -> {
+            try {
+                Files.delete(file.toPath());
+            } catch (IOException | SecurityException e) {
+                LOG.error("Unable to delete snapshot file: {}, persistenceId: {} ", file, persistenceId);
+            }
+        });
         return null;
     }
 
@@ -229,14 +249,20 @@ public class LocalSnapshotStore extends SnapshotStore {
 
         LOG.debug("Deleting files: {}", files);
 
-        files.forEach(File::delete);
+        files.forEach(file -> {
+            try {
+                Files.delete(file.toPath());
+            } catch (IOException | SecurityException e) {
+                LOG.error("Unable to delete snapshot file: {}", file);
+            }
+        });
         return null;
     }
 
     private Collection<File> getSnapshotFiles(final String persistenceId) {
         String encodedPersistenceId = encode(persistenceId);
 
-        File[] files = snapshotDir.listFiles((FilenameFilter) (dir, name) -> {
+        File[] files = snapshotDir.listFiles((dir, name) -> {
             int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
             return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
                     && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
@@ -268,12 +294,11 @@ public class LocalSnapshotStore extends SnapshotStore {
                 .filter(criteria::matches).collect(Collectors.toList());
     }
 
-    private static Stream<SnapshotMetadata> toStream(@Nullable final SnapshotMetadata md) {
+    private static Stream<SnapshotMetadata> toStream(final @Nullable SnapshotMetadata md) {
         return md != null ? Stream.of(md) : Stream.empty();
     }
 
-    @Nullable
-    private static SnapshotMetadata extractMetadata(final File file) {
+    private static @Nullable SnapshotMetadata extractMetadata(final File file) {
         String name = file.getName();
         int sequenceNumberEndIndex = name.lastIndexOf('-');
         int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
@@ -327,9 +352,9 @@ public class LocalSnapshotStore extends SnapshotStore {
 
     @VisibleForTesting
     static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) {
-        return (int) (!m1.persistenceId().equals(m2.persistenceId())
-                ? m1.persistenceId().compareTo(m2.persistenceId()) :
-            m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() :
-                m1.timestamp() != m2.timestamp() ? m1.timestamp() - m2.timestamp() : 0);
+        checkArgument(m1.persistenceId().equals(m2.persistenceId()),
+                "Persistence id does not match. id1: %s, id2: %s", m1.persistenceId(), m2.persistenceId());
+        final int cmp = Long.compare(m1.timestamp(), m2.timestamp());
+        return cmp != 0 ? cmp : Long.compare(m1.sequenceNr(), m2.sequenceNr());
     }
 }