X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fpersistence%2FLocalSnapshotStore.java;h=75eb7fbd6f142e444dfe187417bde25b2d685773;hb=e84f63ee098fff5b02cbce1281ca0d1208f966fa;hp=c89d7ffd45c062b245e6276ae9d9f02658350df4;hpb=7f2be8742ecdb75e92ce910042530ec7fdb43526;p=controller.git diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java index c89d7ffd45..75eb7fbd6f 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.persistence; +import static com.google.common.base.Preconditions.checkArgument; + import akka.actor.ExtendedActorSystem; import akka.dispatch.Futures; import akka.persistence.SelectedSnapshot; @@ -15,19 +17,18 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.serialization.Snapshot; import akka.persistence.serialization.SnapshotSerializer; import akka.persistence.snapshot.japi.SnapshotStore; +import akka.serialization.JavaSerializer; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.typesafe.config.Config; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -40,10 +41,12 @@ import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Optional; +import java.util.concurrent.Callable; import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.io.InputOutputStreamFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; @@ -59,20 +62,30 @@ import scala.concurrent.Future; */ public class LocalSnapshotStore extends SnapshotStore { private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class); - private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length(); + private final InputOutputStreamFactory streamFactory; private final ExecutionContext executionContext; private final int maxLoadAttempts; private final File snapshotDir; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") public LocalSnapshotStore(final Config config) { - this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher")); + executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher")); snapshotDir = new File(config.getString("dir")); - int localMaxLoadAttempts = config.getInt("max-load-attempts"); + final int localMaxLoadAttempts = config.getInt("max-load-attempts"); maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1; + if (config.getBoolean("use-lz4-compression")) { + final String size = config.getString("lz4-blocksize"); + streamFactory = InputOutputStreamFactory.lz4(size); + LOG.debug("Using LZ4 Input/Output Stream, blocksize: {}", size); + } else { + streamFactory = InputOutputStreamFactory.simple(); + LOG.debug("Using plain Input/Output Stream"); + } + LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts); } @@ -134,15 +147,17 @@ public class LocalSnapshotStore extends SnapshotStore { } private Object deserialize(final File file) throws IOException { - try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) { - return in.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException("Error loading snapshot file " + file, e); - } catch (IOException e) { - LOG.debug("Error loading snapshot file {}", file, e); - - return tryDeserializeAkkaSnapshot(file); - } + return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(), + (Callable) () -> { + try (ObjectInputStream in = new ObjectInputStream(streamFactory.createInputStream(file))) { + return in.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException("Error loading snapshot file " + file, e); + } catch (IOException e) { + LOG.debug("Error loading snapshot file {}", file, e); + return tryDeserializeAkkaSnapshot(file); + } + }); } private Object tryDeserializeAkkaSnapshot(final File file) throws IOException { @@ -172,7 +187,7 @@ public class LocalSnapshotStore extends SnapshotStore { LOG.debug("Saving to temp file: {}", temp); - try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(temp))) { + try (ObjectOutputStream out = new ObjectOutputStream(streamFactory.createOutputStream(temp))) { out.writeObject(snapshot); } catch (IOException e) { LOG.error("Error saving snapshot file {}. Deleting file..", temp, e); @@ -220,7 +235,13 @@ public class LocalSnapshotStore extends SnapshotStore { LOG.debug("Deleting files: {}", files); - files.forEach(File::delete); + files.forEach(file -> { + try { + Files.delete(file.toPath()); + } catch (IOException | SecurityException e) { + LOG.error("Unable to delete snapshot file: {}, persistenceId: {} ", file, persistenceId); + } + }); return null; } @@ -229,14 +250,20 @@ public class LocalSnapshotStore extends SnapshotStore { LOG.debug("Deleting files: {}", files); - files.forEach(File::delete); + files.forEach(file -> { + try { + Files.delete(file.toPath()); + } catch (IOException | SecurityException e) { + LOG.error("Unable to delete snapshot file: {}", file); + } + }); return null; } private Collection getSnapshotFiles(final String persistenceId) { String encodedPersistenceId = encode(persistenceId); - File[] files = snapshotDir.listFiles((FilenameFilter) (dir, name) -> { + File[] files = snapshotDir.listFiles((dir, name) -> { int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1); return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp"); @@ -268,12 +295,11 @@ public class LocalSnapshotStore extends SnapshotStore { .filter(criteria::matches).collect(Collectors.toList()); } - private static Stream toStream(@Nullable final SnapshotMetadata md) { + private static Stream toStream(final @Nullable SnapshotMetadata md) { return md != null ? Stream.of(md) : Stream.empty(); } - @Nullable - private static SnapshotMetadata extractMetadata(final File file) { + private static @Nullable SnapshotMetadata extractMetadata(final File file) { String name = file.getName(); int sequenceNumberEndIndex = name.lastIndexOf('-'); int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1); @@ -306,30 +332,18 @@ public class LocalSnapshotStore extends SnapshotStore { } private static String encode(final String str) { - try { - return URLEncoder.encode(str, StandardCharsets.UTF_8.name()); - } catch (UnsupportedEncodingException e) { - // Shouldn't happen - LOG.warn("Error encoding {}", str, e); - return str; - } + return URLEncoder.encode(str, StandardCharsets.UTF_8); } private static String decode(final String str) { - try { - return URLDecoder.decode(str, StandardCharsets.UTF_8.name()); - } catch (final UnsupportedEncodingException e) { - // Shouldn't happen - LOG.warn("Error decoding {}", str, e); - return str; - } + return URLDecoder.decode(str, StandardCharsets.UTF_8); } @VisibleForTesting static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) { - return (int) (!m1.persistenceId().equals(m2.persistenceId()) - ? m1.persistenceId().compareTo(m2.persistenceId()) : - m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() : - m1.timestamp() != m2.timestamp() ? m1.timestamp() - m2.timestamp() : 0); + checkArgument(m1.persistenceId().equals(m2.persistenceId()), + "Persistence id does not match. id1: %s, id2: %s", m1.persistenceId(), m2.persistenceId()); + final int cmp = Long.compare(m1.timestamp(), m2.timestamp()); + return cmp != 0 ? cmp : Long.compare(m1.sequenceNr(), m2.sequenceNr()); } }