import com.google.common.io.ByteStreams;
import com.typesafe.config.Config;
import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.io.InputOutputStreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
*/
public class LocalSnapshotStore extends SnapshotStore {
private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
-
private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
+ private final InputOutputStreamFactory streamFactory;
private final ExecutionContext executionContext;
private final int maxLoadAttempts;
private final File snapshotDir;
this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
snapshotDir = new File(config.getString("dir"));
- int localMaxLoadAttempts = config.getInt("max-load-attempts");
+ final int localMaxLoadAttempts = config.getInt("max-load-attempts");
maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
+ if (config.getBoolean("use-lz4-compression")) {
+ final String size = config.getString("lz4-blocksize");
+ streamFactory = InputOutputStreamFactory.lz4(size);
+ LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size);
+ } else {
+ streamFactory = InputOutputStreamFactory.simple();
+ LOG.debug("Using plain Input/Output Stream");
+ }
+
LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
}
private Object deserialize(final File file) throws IOException {
return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
(Callable<Object>) () -> {
- try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)))) {
+ try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(file))) {
return in.readObject();
} catch (ClassNotFoundException e) {
throw new IOException("Error loading snapshot file " + file, e);
LOG.debug("Saving to temp file: {}", temp);
- try (ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(temp)))) {
+ try (ObjectOutputStream out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) {
out.writeObject(snapshot);
} catch (IOException e) {
LOG.error("Error saving snapshot file {}. Deleting file..", temp, e);