Bug 7521: Add custom local snapshot store 78/51078/16
authorTom Pantelis <tpanteli@brocade.com>
Thu, 26 Jan 2017 16:49:43 +0000 (11:49 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 28 Feb 2017 21:01:55 +0000 (16:01 -0500)
Akka's LocalSnapshotStore serializes to a byte[] before persisting
to the file so we need to write our LocalSnapshotStore that serializes
directly to the file. I patterned the code after akka's LocalSnapshotStore,
mainly translated from scala.

Akka's LocalSnapshotStore wraps the payload data in a Snapshot class
and uses the SnapshotSerializer which locates the serializer for the payload
class and writes some header data before delegating to the payload
serializer. To handle backwards compatibility for a snapshot previously
serialized with akka's format, java de-serialization will detect and fail
with an invalid stream header, in which case we fall back and try
de-serialization via the SnapshotSerializer.

Akka has a standard test suite in SnapshotStoreSpec for testing custom
snapshot store plugins. I derived a LocalSnapshotStoreSpecTest class
that does the setup and teardown with SnapshotStoreSpec doing the rest.
SnapshotStoreSpec uses ScalaTest stuff so need to be run with scala's
JUnitRunner.

I also added a regular LocalSnapshotStoreTest class to cover a few cases
that SnapshotStoreSpec doesn't.

Change-Id: I1ca11682f37aa39d60d3ce57c874c299627e8ca6
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreSpecTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf

index 899b8b875bd3539ef0f6c167a461c1764b2a62d4..899b8e34f854ee4de2cd6a6c45b491c901de40c5 100644 (file)
       <groupId>commons-lang</groupId>
       <artifactId>commons-lang</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+
 
     <!-- Akka -->
     <dependency>
     <dependency>
       <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-testkit_${scala.version}</artifactId>
-      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.typesafe.akka</groupId>
+      <artifactId>akka-persistence-tck_${scala.version}</artifactId>
     </dependency>
 
     <!-- Codahale -->
               <instructions>
                   <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
                   <Export-Package>org.opendaylight.controller.cluster.schema.provider.impl, {local-packages}</Export-Package>
+                  <DynamicImport-Package>*</DynamicImport-Package>
               </instructions>
               </configuration>
           </plugin>
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java
new file mode 100644 (file)
index 0000000..457736d
--- /dev/null
@@ -0,0 +1,312 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.persistence;
+
+import akka.actor.ExtendedActorSystem;
+import akka.dispatch.Futures;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.serialization.Snapshot;
+import akka.persistence.serialization.SnapshotSerializer;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.typesafe.config.Config;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+
+/**
+ * Akka SnapshotStore implementation backed by the local file system. This class was patterned after akka's
+ * LocalSnapshotStore class and exists because akka's version serializes to a byte[] before persisting
+ * to the file which will fail if the data reaches or exceeds Integer.MAX_VALUE in size. This class avoids that issue
+ * by serializing the data directly to the file.
+ *
+ * @author Thomas Pantelis
+ */
+public class LocalSnapshotStore extends SnapshotStore {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class);
+
+    private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length();
+
+    private final ExecutionContext executionContext;
+    private final int maxLoadAttempts;
+    private final File snapshotDir;
+
+    public LocalSnapshotStore(Config config) {
+        this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher"));
+        snapshotDir = new File(config.getString("dir"));
+
+        int localMaxLoadAttempts = config.getInt("max-load-attempts");
+        maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1;
+
+        LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts);
+    }
+
+    @Override
+    public void preStart() throws Exception {
+        if (!snapshotDir.isDirectory()) {
+            // Try to create the directory, on failure double check if someone else beat us to it.
+            if (!snapshotDir.mkdirs() && !snapshotDir.isDirectory()) {
+                throw new IOException("Failed to create snapshot directory " + snapshotDir.getCanonicalPath());
+            }
+        }
+
+        super.preStart();
+    }
+
+    @Override
+    public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
+        LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
+
+        // Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where
+        // saving of a snapshot could not be completed because of a JVM crash. Hence, an attempt to load that snapshot
+        // will fail but loading an older snapshot may succeed.
+
+        Deque<SnapshotMetadata> metadatas = getSnapshotMetadatas(persistenceId, criteria).stream()
+                .sorted(LocalSnapshotStore::compare).collect(reverse()).stream().limit(maxLoadAttempts)
+                    .collect(Collectors.toCollection(ArrayDeque::new));
+
+        if (metadatas.isEmpty()) {
+            return Futures.successful(Optional.empty());
+        }
+
+        LOG.debug("doLoadAsync - found: {}", metadatas);
+
+        return Futures.future(() -> doLoad(metadatas), executionContext);
+    }
+
+    private Optional<SelectedSnapshot> doLoad(Deque<SnapshotMetadata> metadatas) throws IOException {
+        SnapshotMetadata metadata = metadatas.removeFirst();
+        File file = toSnapshotFile(metadata, "");
+
+        LOG.debug("doLoad {}", file);
+
+        try {
+            Object data = deserialize(file);
+
+            LOG.debug("deserialized data: {}", data);
+
+            return Optional.of(new SelectedSnapshot(metadata, data));
+        } catch (IOException e) {
+            LOG.error("Error loading snapshot file {}, remaining attempts: {}", file, metadatas.size(), e);
+
+            if (metadatas.isEmpty()) {
+                throw e;
+            }
+
+            return doLoad(metadatas);
+        }
+    }
+
+    private Object deserialize(File file) throws IOException {
+        try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) {
+            return in.readObject();
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Error loading snapshot file " + file, e);
+        } catch (IOException e) {
+            LOG.debug("Error loading snapshot file {}", file, e);
+
+            return tryDeserializeAkkaSnapshot(file);
+        }
+    }
+
+    private Object tryDeserializeAkkaSnapshot(File file) throws IOException {
+        LOG.debug("tryDeserializeAkkaSnapshot {}", file);
+
+        // The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data
+        // in a Snapshot instance and uses the SnapshotSerializer to serialize it to a byte[]. So we'll use
+        // the SnapshotSerializer to try to de-serialize it.
+
+        SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) context().system());
+
+        try (InputStream in = new BufferedInputStream(new FileInputStream(file))) {
+            return ((Snapshot)snapshotSerializer.fromBinary(ByteStreams.toByteArray(in))).data();
+        }
+    }
+
+    @Override
+    public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
+        LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot);
+
+        return Futures.future(() -> doSave(metadata, snapshot), executionContext);
+    }
+
+    private Void doSave(SnapshotMetadata metadata, Object snapshot) throws IOException {
+        File temp = toSnapshotFile(metadata, ".tmp");
+
+        LOG.debug("Saving to temp file: {}", temp);
+
+        try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(temp))) {
+            out.writeObject(snapshot);
+        } catch (IOException e) {
+            LOG.error("Error saving snapshot file {}", temp, e);
+            throw e;
+        }
+
+        File actual = toSnapshotFile(metadata, "");
+
+        LOG.debug("Renaming to: {}", actual);
+
+        if (!temp.renameTo(actual)) {
+            throw new IOException(String.format("Failed to rename %s to %s", temp, actual));
+        }
+
+        return null;
+    }
+
+    @Override
+    public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
+        LOG.debug("In doDeleteAsync - metadata: {}", metadata);
+
+        // Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them.
+        // Usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we
+        // timestamp snapshots and allow multiple to be kept around (for the same seqNr) if desired.
+
+        return Futures.future(() -> doDelete(metadata), executionContext);
+    }
+
+    @Override
+    public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
+        LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria);
+
+        return Futures.future(() -> doDelete(persistenceId, criteria), executionContext);
+    }
+
+    private Void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) {
+        final List<File> files = getSnapshotMetadatas(persistenceId, criteria).stream()
+                .flatMap(md -> Stream.of(toSnapshotFile(md, ""))).collect(Collectors.toList());
+
+        LOG.debug("Deleting files: {}", files);
+
+        files.forEach(file -> file.delete());
+        return null;
+    }
+
+    private Void doDelete(SnapshotMetadata metadata) {
+        final Collection<File> files = getSnapshotFiles(metadata);
+
+        LOG.debug("Deleting files: {}", files);
+
+        files.forEach(file -> file.delete());
+        return null;
+    }
+
+    private Collection<File> getSnapshotFiles(String persistenceId) {
+        String encodedPersistenceId = encode(persistenceId);
+
+        File[] files = snapshotDir.listFiles((FilenameFilter) (dir, name) -> {
+            int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1);
+            return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex
+                    && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp");
+        });
+
+        if (files == null) {
+            return Collections.emptyList();
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getSnapshotFiles for persistenceId: {}, found files: {}", encodedPersistenceId,
+                    Arrays.toString(files));
+        }
+
+        return Arrays.asList(files);
+    }
+
+    private Collection<File> getSnapshotFiles(SnapshotMetadata metadata) {
+        return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> {
+            SnapshotMetadata possible = extractMetadata(file);
+            return possible != null && possible.sequenceNr() == metadata.sequenceNr()
+                    && (metadata.timestamp() == 0L || possible.timestamp() == metadata.timestamp());
+        }).collect(Collectors.toList());
+    }
+
+    private Collection<SnapshotMetadata> getSnapshotMetadatas(String persistenceId,
+            SnapshotSelectionCriteria criteria) {
+        return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file)))
+                .filter(md -> criteria.matches(md)).collect(Collectors.toList());
+    }
+
+    private static Stream<SnapshotMetadata> toStream(@Nullable SnapshotMetadata md) {
+        return md != null ? Stream.of(md) : Stream.empty();
+    }
+
+    @Nullable
+    private static SnapshotMetadata extractMetadata(File file) {
+        String name = file.getName();
+        int sequenceNumberEndIndex = name.lastIndexOf('-');
+        int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1);
+        if (PERSISTENCE_ID_START_INDEX >= persistenceIdEndIndex) {
+            return null;
+        }
+
+        try {
+            String persistenceId = name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex);
+            long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex));
+            long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1));
+            return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp);
+        } catch (NumberFormatException e) {
+            return null;
+        }
+    }
+
+    private File toSnapshotFile(SnapshotMetadata metadata, String extension) {
+        return new File(snapshotDir, String.format("snapshot-%s-%d-%d%s", encode(metadata.persistenceId()),
+                metadata.sequenceNr(), metadata.timestamp(), extension));
+    }
+
+    private static <T> Collector<T, ?, List<T>> reverse() {
+        return Collectors.collectingAndThen(Collectors.toList(), list -> {
+            Collections.reverse(list);
+            return list;
+        });
+    }
+
+    private String encode(String str) {
+        try {
+            return URLEncoder.encode(str, StandardCharsets.UTF_8.name());
+        } catch (UnsupportedEncodingException e) {
+            // Shouldn't happen
+            LOG.warn("Error encoding {}", str, e);
+            return str;
+        }
+    }
+
+    @VisibleForTesting
+    static int compare(SnapshotMetadata m1, SnapshotMetadata m2) {
+        return (int) (!m1.persistenceId().equals(m2.persistenceId())
+                ? m1.persistenceId().compareTo(m2.persistenceId()) :
+            m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() :
+                m1.timestamp() != m2.timestamp() ? m1.timestamp() - m2.timestamp() : 0);
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreSpecTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreSpecTest.java
new file mode 100644 (file)
index 0000000..7b70063
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.persistence;
+
+import akka.persistence.snapshot.SnapshotStoreSpec;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.junit.runner.RunWith;
+import org.scalatest.junit.JUnitRunner;
+
+/**
+ * Tests the LocalSnapshotStore using akka's standard test suite for snapshot store plugins via SnapshotStoreSpec.
+ * This class basically does the setup and tear down with SnapshotStoreSpec doing the rest. SnapshotStoreSpec uses
+ * ScalaTest so needs to be run with scala's JUnitRunner.
+ *
+ * @author Thomas Pantelis
+ */
+@RunWith(JUnitRunner.class)
+public class LocalSnapshotStoreSpecTest extends SnapshotStoreSpec {
+    private static final long serialVersionUID = 1L;
+    static final File SNAPSHOT_DIR = new File("target/snapshots");
+
+    public LocalSnapshotStoreSpecTest() {
+        super(ConfigFactory.load("LocalSnapshotStoreTest.conf"));
+    }
+
+    @Override
+    public void afterAll() {
+        super.afterAll();
+        FileUtils.deleteQuietly(SNAPSHOT_DIR);
+    }
+
+    static void cleanSnapshotDir() {
+        if (!SNAPSHOT_DIR.exists()) {
+            return;
+        }
+
+        try {
+            FileUtils.cleanDirectory(SNAPSHOT_DIR);
+        } catch (IOException e) {
+            // Ignore
+        }
+    }
+
+    static void createSnapshotDir() {
+        if (!SNAPSHOT_DIR.exists() && !SNAPSHOT_DIR.mkdirs()) {
+            throw new RuntimeException("Failed to create " + SNAPSHOT_DIR);
+        }
+
+        cleanSnapshotDir();
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreTest.java
new file mode 100644 (file)
index 0000000..964b663
--- /dev/null
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.persistence;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+import static org.junit.Assert.assertEquals;
+import static org.opendaylight.controller.cluster.persistence.LocalSnapshotStoreSpecTest.SNAPSHOT_DIR;
+import static org.opendaylight.controller.cluster.persistence.LocalSnapshotStoreSpecTest.cleanSnapshotDir;
+import static org.opendaylight.controller.cluster.persistence.LocalSnapshotStoreSpecTest.createSnapshotDir;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.persistence.Persistence;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotProtocol;
+import akka.persistence.SnapshotProtocol.LoadSnapshot;
+import akka.persistence.SnapshotProtocol.LoadSnapshotFailed;
+import akka.persistence.SnapshotProtocol.LoadSnapshotResult;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.serialization.Snapshot;
+import akka.persistence.serialization.SnapshotSerializer;
+import akka.testkit.JavaTestKit;
+import com.google.common.base.Throwables;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.MockitoAnnotations;
+import scala.Option;
+
+/**
+ * Unit tests for LocalSnapshotStore. These are in addition to LocalSnapshotStoreSpecTest to cover a few cases
+ * that SnapshotStoreSpec doesn't.
+ *
+ * @author Thomas Pantelis
+ */
+public class LocalSnapshotStoreTest {
+    private static final String PERSISTENCE_ID = "member-1-shard-default-config";
+
+    private static ActorSystem system;
+    private static ActorRef snapshotStore;
+
+    @BeforeClass
+    public static void staticSetup() {
+        createSnapshotDir();
+
+        system = ActorSystem.create("test", ConfigFactory.load("LocalSnapshotStoreTest.conf"));
+        snapshotStore = system.registerExtension(Persistence.lookup()).snapshotStoreFor(null);
+    }
+
+    @AfterClass
+    public static void staticCleanup() {
+        FileUtils.deleteQuietly(SNAPSHOT_DIR);
+        JavaTestKit.shutdownActorSystem(system);
+    }
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+        cleanSnapshotDir();
+    }
+
+    @After
+    public void cleanup() {
+        cleanSnapshotDir();
+    }
+
+    @Test
+    public void testDoLoadAsync() throws IOException {
+        createSnapshotFile(PERSISTENCE_ID, "one", 0, 1000);
+        createSnapshotFile(PERSISTENCE_ID, "two", 1, 2000);
+        createSnapshotFile(PERSISTENCE_ID, "three", 1, 3000);
+
+        createSnapshotFile("member-1-shard-default-oper", "foo", 0, 1000);
+        createSnapshotFile("member-1-shard-toaster-oper", "foo", 0, 1000);
+        new File(SNAPSHOT_DIR, "other").createNewFile();
+        new File(SNAPSHOT_DIR, "other-1485349217290").createNewFile();
+
+        SnapshotMetadata metadata3 = new SnapshotMetadata(PERSISTENCE_ID, 1, 3000);
+
+        JavaTestKit probe = new JavaTestKit(system);
+        snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID,
+                SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
+        LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class);
+        Option<SelectedSnapshot> possibleSnapshot = result.snapshot();
+
+        assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty());
+        assertEquals("SelectedSnapshot metadata", metadata3, possibleSnapshot.get().metadata());
+        assertEquals("SelectedSnapshot snapshot", "three", possibleSnapshot.get().snapshot());
+    }
+
+    @Test
+    public void testDoLoadAsyncWithNoSnapshots() throws IOException {
+        JavaTestKit probe = new JavaTestKit(system);
+        snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID,
+                SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
+        LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class);
+        Option<SelectedSnapshot> possibleSnapshot = result.snapshot();
+
+        assertEquals("SelectedSnapshot present", FALSE, possibleSnapshot.nonEmpty());
+    }
+
+    @Test
+    public void testDoLoadAsyncWithRetry() throws IOException  {
+        createSnapshotFile(PERSISTENCE_ID, "one", 0, 1000);
+        createSnapshotFile(PERSISTENCE_ID, null, 1, 2000);
+
+        SnapshotMetadata metadata = new SnapshotMetadata(PERSISTENCE_ID, 0, 1000);
+
+        JavaTestKit probe = new JavaTestKit(system);
+        snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID,
+                SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
+        LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class);
+        Option<SelectedSnapshot> possibleSnapshot = result.snapshot();
+
+        assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty());
+        assertEquals("SelectedSnapshot metadata", metadata, possibleSnapshot.get().metadata());
+        assertEquals("SelectedSnapshot snapshot", "one", possibleSnapshot.get().snapshot());
+    }
+
+    @Test(expected = IOException.class)
+    public void testDoLoadAsyncWithFailure() throws IOException {
+        createSnapshotFile(PERSISTENCE_ID, null, 1, 2000);
+
+        JavaTestKit probe = new JavaTestKit(system);
+        snapshotStore.tell(new SnapshotProtocol.LoadSnapshot(PERSISTENCE_ID,
+                SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
+        LoadSnapshotFailed failed = probe.expectMsgClass(LoadSnapshotFailed.class);
+        Throwables.propagateIfInstanceOf(failed.cause(), IOException.class);
+    }
+
+    @Test
+    public void testDoLoadAsyncWithAkkaSerializedSnapshot() throws IOException {
+        SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) system);
+
+        String name = toSnapshotName(PERSISTENCE_ID, 1, 1000);
+        try (FileOutputStream fos = new FileOutputStream(new File(SNAPSHOT_DIR, name))) {
+            fos.write(snapshotSerializer.toBinary(new Snapshot("one")));
+        }
+
+        SnapshotMetadata metadata = new SnapshotMetadata(PERSISTENCE_ID, 1, 1000);
+
+        JavaTestKit probe = new JavaTestKit(system);
+        snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID,
+                SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
+        LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class);
+        Option<SelectedSnapshot> possibleSnapshot = result.snapshot();
+
+        assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty());
+        assertEquals("SelectedSnapshot metadata", metadata, possibleSnapshot.get().metadata());
+        assertEquals("SelectedSnapshot snapshot", "one", possibleSnapshot.get().snapshot());
+    }
+
+    private void createSnapshotFile(String persistenceId, String payload, int seqNr, int timestamp) throws IOException {
+        String name = toSnapshotName(persistenceId, seqNr, timestamp);
+        try (FileOutputStream fos = new FileOutputStream(new File(SNAPSHOT_DIR, name))) {
+            if (payload != null) {
+                fos.write(SerializationUtils.serialize(payload));
+            }
+        }
+    }
+
+    private static String toSnapshotName(String persistenceId, int seqNr, int timestamp) {
+        return "snapshot-" + persistenceId + "-" + seqNr + "-" + timestamp;
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf
new file mode 100644 (file)
index 0000000..83b1ece
--- /dev/null
@@ -0,0 +1,7 @@
+akka {
+  persistence {
+      snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore"
+      snapshot-store.plugin = akka.persistence.snapshot-store.local
+      snapshot-store.local.dir = "target/snapshots"
+    }
+}
index af200deedc0667baa06a1f1335a20605938b18bb..a9591f1a77c11800ef0e8063d1e8d3d53ec57c63 100644 (file)
@@ -92,6 +92,8 @@ odl-cluster-data {
 
     persistence {
       journal.plugin = akka.persistence.journal.leveldb
+
+      snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore"
       snapshot-store.plugin = akka.persistence.snapshot-store.local
     }
   }