Reduce JSR305 proliferation
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / persistence / LocalSnapshotStore.java
index 3e170fd326c4f93272374e34ef2a74bbc2d5277b..76babc15db7d831403e985c658e3a3cef46f3e99 100644 (file)
@@ -15,6 +15,7 @@ import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.serialization.Snapshot;
 import akka.persistence.serialization.SnapshotSerializer;
 import akka.persistence.snapshot.japi.SnapshotStore;
+import akka.serialization.JavaSerializer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteStreams;
 import com.typesafe.config.Config;
@@ -22,12 +23,12 @@ import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -39,10 +40,11 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.Callable;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
@@ -133,15 +135,17 @@ public class LocalSnapshotStore extends SnapshotStore {
     }
 
     private Object deserialize(final File file) throws IOException {
-        try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) {
-            return in.readObject();
-        } catch (ClassNotFoundException e) {
-            throw new IOException("Error loading snapshot file " + file, e);
-        } catch (IOException e) {
-            LOG.debug("Error loading snapshot file {}", file, e);
-
-            return tryDeserializeAkkaSnapshot(file);
-        }
+        return JavaSerializer.currentSystem().withValue((ExtendedActorSystem) context().system(),
+            (Callable<Object>) () -> {
+                try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) {
+                    return in.readObject();
+                } catch (ClassNotFoundException e) {
+                    throw new IOException("Error loading snapshot file " + file, e);
+                } catch (IOException e) {
+                    LOG.debug("Error loading snapshot file {}", file, e);
+                    return tryDeserializeAkkaSnapshot(file);
+                }
+            });
     }
 
     private Object tryDeserializeAkkaSnapshot(final File file) throws IOException {
@@ -219,7 +223,7 @@ public class LocalSnapshotStore extends SnapshotStore {
 
         LOG.debug("Deleting files: {}", files);
 
-        files.forEach(file -> file.delete());
+        files.forEach(File::delete);
         return null;
     }
 
@@ -228,14 +232,14 @@ public class LocalSnapshotStore extends SnapshotStore {
 
         LOG.debug("Deleting files: {}", files);
 
-        files.forEach(file -> file.delete());
+        files.forEach(File::delete);
         return null;
     }
 
     private Collection<File> getSnapshotFiles(final String persistenceId) {
         String encodedPersistenceId = encode(persistenceId);
 
-        File[] files = snapshotDir.listFiles((FilenameFilter) (dir, name) -> {
+        File[] files = snapshotDir.listFiles((dir, name) -> {
             int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
             return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
                     && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
@@ -264,15 +268,14 @@ public class LocalSnapshotStore extends SnapshotStore {
     private Collection<SnapshotMetadata> getSnapshotMetadatas(final String persistenceId,
             final SnapshotSelectionCriteria criteria) {
         return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
-                .filter(md -> criteria.matches(md)).collect(Collectors.toList());
+                .filter(criteria::matches).collect(Collectors.toList());
     }
 
-    private static Stream<SnapshotMetadata> toStream(@Nullable final SnapshotMetadata md) {
+    private static Stream<SnapshotMetadata> toStream(final @Nullable SnapshotMetadata md) {
         return md != null ? Stream.of(md) : Stream.empty();
     }
 
-    @Nullable
-    private static SnapshotMetadata extractMetadata(final File file) {
+    private static @Nullable SnapshotMetadata extractMetadata(final File file) {
         String name = file.getName();
         int sequenceNumberEndIndex = name.lastIndexOf('-');
         int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
@@ -281,7 +284,9 @@ public class LocalSnapshotStore extends SnapshotStore {
         }
 
         try {
-            String persistenceId = name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex);
+            // Since the persistenceId is url encoded in the filename, we need
+            // to decode relevant filename's part to obtain persistenceId back
+            String persistenceId = decode(name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex));
             long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex));
             long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1));
             return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp);
@@ -312,6 +317,16 @@ public class LocalSnapshotStore extends SnapshotStore {
         }
     }
 
+    private static String decode(final String str) {
+        try {
+            return URLDecoder.decode(str, StandardCharsets.UTF_8.name());
+        } catch (final UnsupportedEncodingException e) {
+            // Shouldn't happen
+            LOG.warn("Error decoding {}", str, e);
+            return str;
+        }
+    }
+
     @VisibleForTesting
     static int compare(final SnapshotMetadata m1, final SnapshotMetadata m2) {
         return (int) (!m1.persistenceId().equals(m2.persistenceId())