X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fpersistence%2FLocalSnapshotStore.java;h=62b0cf5bfd061867da72cd4084eb8eec86d72821;hb=5cc558ac2fa4d221e26b76df725bade5b44ee794;hp=457736d5c4a458bc766ccf6f6f39d56ef2ab12bf;hpb=ed6ec368fa9aa9b4cd770769e264c19ddc7549ea;p=controller.git diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java index 457736d5c4..62b0cf5bfd 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java @@ -15,6 +15,7 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.persistence.serialization.Snapshot; import akka.persistence.serialization.SnapshotSerializer; import akka.persistence.snapshot.japi.SnapshotStore; +import akka.serialization.JavaSerializer; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.typesafe.config.Config; @@ -22,14 +23,16 @@ import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collection; @@ -37,10 +40,11 @@ import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Optional; +import java.util.concurrent.Callable; import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; @@ -63,7 +67,7 @@ public class LocalSnapshotStore extends SnapshotStore { private final int maxLoadAttempts; private final File snapshotDir; - public LocalSnapshotStore(Config config) { + public LocalSnapshotStore(final Config config) { this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher")); snapshotDir = new File(config.getString("dir")); @@ -86,7 +90,8 @@ public class LocalSnapshotStore extends SnapshotStore { } @Override - public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + public Future> doLoadAsync(final String persistenceId, + final SnapshotSelectionCriteria criteria) { LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria); // Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where @@ -106,9 +111,9 @@ public class LocalSnapshotStore extends SnapshotStore { return Futures.future(() -> doLoad(metadatas), executionContext); } - private Optional doLoad(Deque metadatas) throws IOException { + private Optional doLoad(final Deque metadatas) throws IOException { SnapshotMetadata metadata = metadatas.removeFirst(); - File file = toSnapshotFile(metadata, ""); + File file = toSnapshotFile(metadata); LOG.debug("doLoad {}", file); @@ -129,19 +134,21 @@ public class LocalSnapshotStore extends SnapshotStore { } } - private Object deserialize(File file) throws IOException { - try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) { - return in.readObject(); - } catch (ClassNotFoundException e) { - throw new IOException("Error loading snapshot file " + file, e); - } catch (IOException e) { - LOG.debug("Error loading snapshot file {}", file, e); - - return tryDeserializeAkkaSnapshot(file); - } + private Object deserialize(final File file) throws IOException { + return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(), + (Callable) () -> { + try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) { + return in.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException("Error loading snapshot file " + file, e); + } catch (IOException e) { + LOG.debug("Error loading snapshot file {}", file, e); + return tryDeserializeAkkaSnapshot(file); + } + }); } - private Object tryDeserializeAkkaSnapshot(File file) throws IOException { + private Object tryDeserializeAkkaSnapshot(final File file) throws IOException { LOG.debug("tryDeserializeAkkaSnapshot {}", file); // The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data @@ -156,37 +163,44 @@ public class LocalSnapshotStore extends SnapshotStore { } @Override - public Future doSaveAsync(SnapshotMetadata metadata, Object snapshot) { + public Future doSaveAsync(final SnapshotMetadata metadata, final Object snapshot) { LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot); return Futures.future(() -> doSave(metadata, snapshot), executionContext); } - private Void doSave(SnapshotMetadata metadata, Object snapshot) throws IOException { - File temp = toSnapshotFile(metadata, ".tmp"); + private Void doSave(final SnapshotMetadata metadata, final Object snapshot) throws IOException { + final File actual = toSnapshotFile(metadata); + final File temp = File.createTempFile(actual.getName(), null, snapshotDir); LOG.debug("Saving to temp file: {}", temp); try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(temp))) { out.writeObject(snapshot); } catch (IOException e) { - LOG.error("Error saving snapshot file {}", temp, e); + LOG.error("Error saving snapshot file {}. Deleting file..", temp, e); + if (!temp.delete()) { + LOG.error("Failed to successfully delete file {}", temp); + } throw e; } - File actual = toSnapshotFile(metadata, ""); - LOG.debug("Renaming to: {}", actual); - - if (!temp.renameTo(actual)) { - throw new IOException(String.format("Failed to rename %s to %s", temp, actual)); + try { + Files.move(temp.toPath(), actual.toPath(), StandardCopyOption.ATOMIC_MOVE); + } catch (IOException e) { + LOG.warn("Failed to move {} to {}. Deleting {}..", temp, actual, temp, e); + if (!temp.delete()) { + LOG.error("Failed to successfully delete file {}", temp); + } + throw e; } return null; } @Override - public Future doDeleteAsync(SnapshotMetadata metadata) { + public Future doDeleteAsync(final SnapshotMetadata metadata) { LOG.debug("In doDeleteAsync - metadata: {}", metadata); // Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them. @@ -197,35 +211,47 @@ public class LocalSnapshotStore extends SnapshotStore { } @Override - public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + public Future doDeleteAsync(final String persistenceId, final SnapshotSelectionCriteria criteria) { LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria); return Futures.future(() -> doDelete(persistenceId, criteria), executionContext); } - private Void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) { + private Void doDelete(final String persistenceId, final SnapshotSelectionCriteria criteria) { final List files = getSnapshotMetadatas(persistenceId, criteria).stream() - .flatMap(md -> Stream.of(toSnapshotFile(md, ""))).collect(Collectors.toList()); + .flatMap(md -> Stream.of(toSnapshotFile(md))).collect(Collectors.toList()); LOG.debug("Deleting files: {}", files); - files.forEach(file -> file.delete()); + files.forEach(file -> { + try { + Files.delete(file.toPath()); + } catch (IOException | SecurityException e) { + LOG.error("Unable to delete snapshot file: {}, persistenceId: {} ", file, persistenceId); + } + }); return null; } - private Void doDelete(SnapshotMetadata metadata) { + private Void doDelete(final SnapshotMetadata metadata) { final Collection files = getSnapshotFiles(metadata); LOG.debug("Deleting files: {}", files); - files.forEach(file -> file.delete()); + files.forEach(file -> { + try { + Files.delete(file.toPath()); + } catch (IOException | SecurityException e) { + LOG.error("Unable to delete snapshot file: {}", file); + } + }); return null; } - private Collection getSnapshotFiles(String persistenceId) { + private Collection getSnapshotFiles(final String persistenceId) { String encodedPersistenceId = encode(persistenceId); - File[] files = snapshotDir.listFiles((FilenameFilter) (dir, name) -> { + File[] files = snapshotDir.listFiles((dir, name) -> { int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1); return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp"); @@ -243,7 +269,7 @@ public class LocalSnapshotStore extends SnapshotStore { return Arrays.asList(files); } - private Collection getSnapshotFiles(SnapshotMetadata metadata) { + private Collection getSnapshotFiles(final SnapshotMetadata metadata) { return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> { SnapshotMetadata possible = extractMetadata(file); return possible != null && possible.sequenceNr() == metadata.sequenceNr() @@ -251,18 +277,17 @@ public class LocalSnapshotStore extends SnapshotStore { }).collect(Collectors.toList()); } - private Collection getSnapshotMetadatas(String persistenceId, - SnapshotSelectionCriteria criteria) { + private Collection getSnapshotMetadatas(final String persistenceId, + final SnapshotSelectionCriteria criteria) { return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file))) - .filter(md -> criteria.matches(md)).collect(Collectors.toList()); + .filter(criteria::matches).collect(Collectors.toList()); } - private static Stream toStream(@Nullable SnapshotMetadata md) { + private static Stream toStream(final @Nullable SnapshotMetadata md) { return md != null ? Stream.of(md) : Stream.empty(); } - @Nullable - private static SnapshotMetadata extractMetadata(File file) { + private static @Nullable SnapshotMetadata extractMetadata(final File file) { String name = file.getName(); int sequenceNumberEndIndex = name.lastIndexOf('-'); int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1); @@ -271,7 +296,9 @@ public class LocalSnapshotStore extends SnapshotStore { } try { - String persistenceId = name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex); + // Since the persistenceId is url encoded in the filename, we need + // to decode relevant filename's part to obtain persistenceId back + String persistenceId = decode(name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex)); long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex)); long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1)); return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp); @@ -280,9 +307,9 @@ public class LocalSnapshotStore extends SnapshotStore { } } - private File toSnapshotFile(SnapshotMetadata metadata, String extension) { - return new File(snapshotDir, String.format("snapshot-%s-%d-%d%s", encode(metadata.persistenceId()), - metadata.sequenceNr(), metadata.timestamp(), extension)); + private File toSnapshotFile(final SnapshotMetadata metadata) { + return new File(snapshotDir, String.format("snapshot-%s-%d-%d", encode(metadata.persistenceId()), + metadata.sequenceNr(), metadata.timestamp())); } private static Collector> reverse() { @@ -292,7 +319,7 @@ public class LocalSnapshotStore extends SnapshotStore { }); } - private String encode(String str) { + private static String encode(final String str) { try { return URLEncoder.encode(str, StandardCharsets.UTF_8.name()); } catch (UnsupportedEncodingException e) { @@ -302,8 +329,18 @@ public class LocalSnapshotStore extends SnapshotStore { } } + private static String decode(final String str) { + try { + return URLDecoder.decode(str, StandardCharsets.UTF_8.name()); + } catch (final UnsupportedEncodingException e) { + // Shouldn't happen + LOG.warn("Error decoding {}", str, e); + return str; + } + } + @VisibleForTesting - static int compare(SnapshotMetadata m1, SnapshotMetadata m2) { + static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) { return (int) (!m1.persistenceId().equals(m2.persistenceId()) ? m1.persistenceId().compareTo(m2.persistenceId()) : m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() :