X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fpersistence%2FLocalSnapshotStore.java;h=75eb7fbd6f142e444dfe187417bde25b2d685773;hb=e84f63ee098fff5b02cbce1281ca0d1208f966fa;hp=1ceba1cd1320e9790e99f44a046e0522d592c319;hpb=466078ab1dc8a8cc2981b161051f6edecd6af85a;p=controller.git diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java index 1ceba1cd13..75eb7fbd6f 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.persistence; +import static com.google.common.base.Preconditions.checkArgument; + import akka.actor.ExtendedActorSystem; import akka.dispatch.Futures; import akka.persistence.SelectedSnapshot; @@ -19,15 +21,14 @@ import akka.serialization.JavaSerializer; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.typesafe.config.Config; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -44,7 +45,8 @@ import java.util.concurrent.Callable; import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; @@ -60,20 +62,30 @@ import scala.concurrent.Future; */ public class LocalSnapshotStore extends SnapshotStore { private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class); - private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length(); + private final InputOutputStreamFactory streamFactory; private final ExecutionContext executionContext; private final int maxLoadAttempts; private final File snapshotDir; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") public LocalSnapshotStore(final Config config) { - this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher")); + executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher")); snapshotDir = new File(config.getString("dir")); - int localMaxLoadAttempts = config.getInt("max-load-attempts"); + final int localMaxLoadAttempts = config.getInt("max-load-attempts"); maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1; + if (config.getBoolean("use-lz4-compression")) { + final String size = config.getString("lz4-blocksize"); + streamFactory = InputOutputStreamFactory.lz4(size); + LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size); + } else { + streamFactory = InputOutputStreamFactory.simple(); + LOG.debug("Using plain Input/Output Stream"); + } + LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts); } @@ -137,7 +149,7 @@ public class LocalSnapshotStore extends SnapshotStore { private Object deserialize(final File file) throws IOException { return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(), (Callable) () -> { - try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) { + try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(file))) { return in.readObject(); } catch (ClassNotFoundException e) { throw new IOException("Error loading snapshot file " + file, e); @@ -175,7 +187,7 @@ public class LocalSnapshotStore extends SnapshotStore { LOG.debug("Saving to temp file: {}", temp); - try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(temp))) { + try (ObjectOutputStream out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) { out.writeObject(snapshot); } catch (IOException e) { LOG.error("Error saving snapshot file {}. Deleting file..", temp, e); @@ -223,7 +235,13 @@ public class LocalSnapshotStore extends SnapshotStore { LOG.debug("Deleting files: {}", files); - files.forEach(File::delete); + files.forEach(file -> { + try { + Files.delete(file.toPath()); + } catch (IOException | SecurityException e) { + LOG.error("Unable to delete snapshot file: {}, persistenceId: {} ", file, persistenceId); + } + }); return null; } @@ -232,7 +250,13 @@ public class LocalSnapshotStore extends SnapshotStore { LOG.debug("Deleting files: {}", files); - files.forEach(File::delete); + files.forEach(file -> { + try { + Files.delete(file.toPath()); + } catch (IOException | SecurityException e) { + LOG.error("Unable to delete snapshot file: {}", file); + } + }); return null; } @@ -271,12 +295,11 @@ public class LocalSnapshotStore extends SnapshotStore { .filter(criteria::matches).collect(Collectors.toList()); } - private static Stream toStream(@Nullable final SnapshotMetadata md) { + private static Stream toStream(final @Nullable SnapshotMetadata md) { return md != null ? Stream.of(md) : Stream.empty(); } - @Nullable - private static SnapshotMetadata extractMetadata(final File file) { + private static @Nullable SnapshotMetadata extractMetadata(final File file) { String name = file.getName(); int sequenceNumberEndIndex = name.lastIndexOf('-'); int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1); @@ -309,30 +332,18 @@ public class LocalSnapshotStore extends SnapshotStore { } private static String encode(final String str) { - try { - return URLEncoder.encode(str, StandardCharsets.UTF_8.name()); - } catch (UnsupportedEncodingException e) { - // Shouldn't happen - LOG.warn("Error encoding {}", str, e); - return str; - } + return URLEncoder.encode(str, StandardCharsets.UTF_8); } private static String decode(final String str) { - try { - return URLDecoder.decode(str, StandardCharsets.UTF_8.name()); - } catch (final UnsupportedEncodingException e) { - // Shouldn't happen - LOG.warn("Error decoding {}", str, e); - return str; - } + return URLDecoder.decode(str, StandardCharsets.UTF_8); } @VisibleForTesting static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) { - return (int) (!m1.persistenceId().equals(m2.persistenceId()) - ? m1.persistenceId().compareTo(m2.persistenceId()) : - m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() : - m1.timestamp() != m2.timestamp() ? m1.timestamp() - m2.timestamp() : 0); + checkArgument(m1.persistenceId().equals(m2.persistenceId()), + "Persistence id does not match. id1: %s, id2: %s", m1.persistenceId(), m2.persistenceId()); + final int cmp = Long.compare(m1.timestamp(), m2.timestamp()); + return cmp != 0 ? cmp : Long.compare(m1.sequenceNr(), m2.sequenceNr()); } }