From: Tom Pantelis Date: Thu, 26 Jan 2017 16:49:43 +0000 (-0500) Subject: Bug 7521: Add custom local snapshot store X-Git-Tag: release/carbon~238 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=ed6ec368fa9aa9b4cd770769e264c19ddc7549ea Bug 7521: Add custom local snapshot store Akka's LocalSnapshotStore serializes to a byte[] before persisting to the file so we need to write our LocalSnapshotStore that serializes directly to the file. I patterned the code after akka's LocalSnapshotStore, mainly translated from scala. Akka's LocalSnapshotStore wraps the payload data in a Snapshot class and uses the SnapshotSerializer which locates the serializer for the payload class and writes some header data before delegating to the payload serializer. To handle backwards compatibility for a snapshot previously serialized with akka's format, java de-serialization will detect and fail with an invalid stream header, in which case we fall back and try de-serialization via the SnapshotSerializer. Akka has a standard test suite in SnapshotStoreSpec for testing custom snapshot store plugins. I derived a LocalSnapshotStoreSpecTest class that does the setup and teardown with SnapshotStoreSpec doing the rest. SnapshotStoreSpec uses ScalaTest stuff so need to be run with scala's JUnitRunner. I also added a regular LocalSnapshotStoreTest class to cover a few cases that SnapshotStoreSpec doesn't. Change-Id: I1ca11682f37aa39d60d3ce57c874c299627e8ca6 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-clustering-commons/pom.xml b/opendaylight/md-sal/sal-clustering-commons/pom.xml index 899b8b875b..899b8e34f8 100644 --- a/opendaylight/md-sal/sal-clustering-commons/pom.xml +++ b/opendaylight/md-sal/sal-clustering-commons/pom.xml @@ -43,6 +43,12 @@ commons-lang commons-lang + + commons-io + commons-io + test + + @@ -72,7 +78,10 @@ com.typesafe.akka akka-testkit_${scala.version} - test + + + com.typesafe.akka + akka-persistence-tck_${scala.version} @@ -201,6 +210,7 @@ ${project.groupId}.${project.artifactId} org.opendaylight.controller.cluster.schema.provider.impl, {local-packages} + * diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java new file mode 100644 index 0000000000..457736d5c4 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStore.java @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.persistence; + +import akka.actor.ExtendedActorSystem; +import akka.dispatch.Futures; +import akka.persistence.SelectedSnapshot; +import akka.persistence.SnapshotMetadata; +import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.serialization.Snapshot; +import akka.persistence.serialization.SnapshotSerializer; +import akka.persistence.snapshot.japi.SnapshotStore; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteStreams; +import com.typesafe.config.Config; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; + +/** + * Akka SnapshotStore implementation backed by the local file system. This class was patterned after akka's + * LocalSnapshotStore class and exists because akka's version serializes to a byte[] before persisting + * to the file which will fail if the data reaches or exceeds Integer.MAX_VALUE in size. This class avoids that issue + * by serializing the data directly to the file. + * + * @author Thomas Pantelis + */ +public class LocalSnapshotStore extends SnapshotStore { + private static final Logger LOG = LoggerFactory.getLogger(LocalSnapshotStore.class); + + private static final int PERSISTENCE_ID_START_INDEX = "snapshot-".length(); + + private final ExecutionContext executionContext; + private final int maxLoadAttempts; + private final File snapshotDir; + + public LocalSnapshotStore(Config config) { + this.executionContext = context().system().dispatchers().lookup(config.getString("stream-dispatcher")); + snapshotDir = new File(config.getString("dir")); + + int localMaxLoadAttempts = config.getInt("max-load-attempts"); + maxLoadAttempts = localMaxLoadAttempts > 0 ? localMaxLoadAttempts : 1; + + LOG.debug("LocalSnapshotStore ctor: snapshotDir: {}, maxLoadAttempts: {}", snapshotDir, maxLoadAttempts); + } + + @Override + public void preStart() throws Exception { + if (!snapshotDir.isDirectory()) { + // Try to create the directory, on failure double check if someone else beat us to it. + if (!snapshotDir.mkdirs() && !snapshotDir.isDirectory()) { + throw new IOException("Failed to create snapshot directory " + snapshotDir.getCanonicalPath()); + } + } + + super.preStart(); + } + + @Override + public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + LOG.debug("In doLoadAsync - persistenceId: {}, criteria: {}", persistenceId, criteria); + + // Select the youngest 'maxLoadAttempts' snapshots that match the criteria. This may help in situations where + // saving of a snapshot could not be completed because of a JVM crash. Hence, an attempt to load that snapshot + // will fail but loading an older snapshot may succeed. + + Deque metadatas = getSnapshotMetadatas(persistenceId, criteria).stream() + .sorted(LocalSnapshotStore::compare).collect(reverse()).stream().limit(maxLoadAttempts) + .collect(Collectors.toCollection(ArrayDeque::new)); + + if (metadatas.isEmpty()) { + return Futures.successful(Optional.empty()); + } + + LOG.debug("doLoadAsync - found: {}", metadatas); + + return Futures.future(() -> doLoad(metadatas), executionContext); + } + + private Optional doLoad(Deque metadatas) throws IOException { + SnapshotMetadata metadata = metadatas.removeFirst(); + File file = toSnapshotFile(metadata, ""); + + LOG.debug("doLoad {}", file); + + try { + Object data = deserialize(file); + + LOG.debug("deserialized data: {}", data); + + return Optional.of(new SelectedSnapshot(metadata, data)); + } catch (IOException e) { + LOG.error("Error loading snapshot file {}, remaining attempts: {}", file, metadatas.size(), e); + + if (metadatas.isEmpty()) { + throw e; + } + + return doLoad(metadatas); + } + } + + private Object deserialize(File file) throws IOException { + try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(file))) { + return in.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException("Error loading snapshot file " + file, e); + } catch (IOException e) { + LOG.debug("Error loading snapshot file {}", file, e); + + return tryDeserializeAkkaSnapshot(file); + } + } + + private Object tryDeserializeAkkaSnapshot(File file) throws IOException { + LOG.debug("tryDeserializeAkkaSnapshot {}", file); + + // The snapshot was probably previously stored via akka's LocalSnapshotStore which wraps the data + // in a Snapshot instance and uses the SnapshotSerializer to serialize it to a byte[]. So we'll use + // the SnapshotSerializer to try to de-serialize it. + + SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) context().system()); + + try (InputStream in = new BufferedInputStream(new FileInputStream(file))) { + return ((Snapshot)snapshotSerializer.fromBinary(ByteStreams.toByteArray(in))).data(); + } + } + + @Override + public Future doSaveAsync(SnapshotMetadata metadata, Object snapshot) { + LOG.debug("In doSaveAsync - metadata: {}, snapshot: {}", metadata, snapshot); + + return Futures.future(() -> doSave(metadata, snapshot), executionContext); + } + + private Void doSave(SnapshotMetadata metadata, Object snapshot) throws IOException { + File temp = toSnapshotFile(metadata, ".tmp"); + + LOG.debug("Saving to temp file: {}", temp); + + try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(temp))) { + out.writeObject(snapshot); + } catch (IOException e) { + LOG.error("Error saving snapshot file {}", temp, e); + throw e; + } + + File actual = toSnapshotFile(metadata, ""); + + LOG.debug("Renaming to: {}", actual); + + if (!temp.renameTo(actual)) { + throw new IOException(String.format("Failed to rename %s to %s", temp, actual)); + } + + return null; + } + + @Override + public Future doDeleteAsync(SnapshotMetadata metadata) { + LOG.debug("In doDeleteAsync - metadata: {}", metadata); + + // Multiple snapshot files here mean that there were multiple snapshots for this seqNr - we delete all of them. + // Usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we + // timestamp snapshots and allow multiple to be kept around (for the same seqNr) if desired. + + return Futures.future(() -> doDelete(metadata), executionContext); + } + + @Override + public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + LOG.debug("In doDeleteAsync - persistenceId: {}, criteria: {}", persistenceId, criteria); + + return Futures.future(() -> doDelete(persistenceId, criteria), executionContext); + } + + private Void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) { + final List files = getSnapshotMetadatas(persistenceId, criteria).stream() + .flatMap(md -> Stream.of(toSnapshotFile(md, ""))).collect(Collectors.toList()); + + LOG.debug("Deleting files: {}", files); + + files.forEach(file -> file.delete()); + return null; + } + + private Void doDelete(SnapshotMetadata metadata) { + final Collection files = getSnapshotFiles(metadata); + + LOG.debug("Deleting files: {}", files); + + files.forEach(file -> file.delete()); + return null; + } + + private Collection getSnapshotFiles(String persistenceId) { + String encodedPersistenceId = encode(persistenceId); + + File[] files = snapshotDir.listFiles((FilenameFilter) (dir, name) -> { + int persistenceIdEndIndex = name.lastIndexOf('-', name.lastIndexOf('-') - 1); + return PERSISTENCE_ID_START_INDEX + encodedPersistenceId.length() == persistenceIdEndIndex + && name.startsWith(encodedPersistenceId, PERSISTENCE_ID_START_INDEX) && !name.endsWith(".tmp"); + }); + + if (files == null) { + return Collections.emptyList(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("getSnapshotFiles for persistenceId: {}, found files: {}", encodedPersistenceId, + Arrays.toString(files)); + } + + return Arrays.asList(files); + } + + private Collection getSnapshotFiles(SnapshotMetadata metadata) { + return getSnapshotFiles(metadata.persistenceId()).stream().filter(file -> { + SnapshotMetadata possible = extractMetadata(file); + return possible != null && possible.sequenceNr() == metadata.sequenceNr() + && (metadata.timestamp() == 0L || possible.timestamp() == metadata.timestamp()); + }).collect(Collectors.toList()); + } + + private Collection getSnapshotMetadatas(String persistenceId, + SnapshotSelectionCriteria criteria) { + return getSnapshotFiles(persistenceId).stream().flatMap(file -> toStream(extractMetadata(file))) + .filter(md -> criteria.matches(md)).collect(Collectors.toList()); + } + + private static Stream toStream(@Nullable SnapshotMetadata md) { + return md != null ? Stream.of(md) : Stream.empty(); + } + + @Nullable + private static SnapshotMetadata extractMetadata(File file) { + String name = file.getName(); + int sequenceNumberEndIndex = name.lastIndexOf('-'); + int persistenceIdEndIndex = name.lastIndexOf('-', sequenceNumberEndIndex - 1); + if (PERSISTENCE_ID_START_INDEX >= persistenceIdEndIndex) { + return null; + } + + try { + String persistenceId = name.substring(PERSISTENCE_ID_START_INDEX, persistenceIdEndIndex); + long sequenceNumber = Long.parseLong(name.substring(persistenceIdEndIndex + 1, sequenceNumberEndIndex)); + long timestamp = Long.parseLong(name.substring(sequenceNumberEndIndex + 1)); + return new SnapshotMetadata(persistenceId, sequenceNumber, timestamp); + } catch (NumberFormatException e) { + return null; + } + } + + private File toSnapshotFile(SnapshotMetadata metadata, String extension) { + return new File(snapshotDir, String.format("snapshot-%s-%d-%d%s", encode(metadata.persistenceId()), + metadata.sequenceNr(), metadata.timestamp(), extension)); + } + + private static Collector> reverse() { + return Collectors.collectingAndThen(Collectors.toList(), list -> { + Collections.reverse(list); + return list; + }); + } + + private String encode(String str) { + try { + return URLEncoder.encode(str, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + // Shouldn't happen + LOG.warn("Error encoding {}", str, e); + return str; + } + } + + @VisibleForTesting + static int compare(SnapshotMetadata m1, SnapshotMetadata m2) { + return (int) (!m1.persistenceId().equals(m2.persistenceId()) + ? m1.persistenceId().compareTo(m2.persistenceId()) : + m1.sequenceNr() != m2.sequenceNr() ? m1.sequenceNr() - m2.sequenceNr() : + m1.timestamp() != m2.timestamp() ? m1.timestamp() - m2.timestamp() : 0); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreSpecTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreSpecTest.java new file mode 100644 index 0000000000..7b70063afa --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreSpecTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.persistence; + +import akka.persistence.snapshot.SnapshotStoreSpec; +import com.typesafe.config.ConfigFactory; +import java.io.File; +import java.io.IOException; +import org.apache.commons.io.FileUtils; +import org.junit.runner.RunWith; +import org.scalatest.junit.JUnitRunner; + +/** + * Tests the LocalSnapshotStore using akka's standard test suite for snapshot store plugins via SnapshotStoreSpec. + * This class basically does the setup and tear down with SnapshotStoreSpec doing the rest. SnapshotStoreSpec uses + * ScalaTest so needs to be run with scala's JUnitRunner. + * + * @author Thomas Pantelis + */ +@RunWith(JUnitRunner.class) +public class LocalSnapshotStoreSpecTest extends SnapshotStoreSpec { + private static final long serialVersionUID = 1L; + static final File SNAPSHOT_DIR = new File("target/snapshots"); + + public LocalSnapshotStoreSpecTest() { + super(ConfigFactory.load("LocalSnapshotStoreTest.conf")); + } + + @Override + public void afterAll() { + super.afterAll(); + FileUtils.deleteQuietly(SNAPSHOT_DIR); + } + + static void cleanSnapshotDir() { + if (!SNAPSHOT_DIR.exists()) { + return; + } + + try { + FileUtils.cleanDirectory(SNAPSHOT_DIR); + } catch (IOException e) { + // Ignore + } + } + + static void createSnapshotDir() { + if (!SNAPSHOT_DIR.exists() && !SNAPSHOT_DIR.mkdirs()) { + throw new RuntimeException("Failed to create " + SNAPSHOT_DIR); + } + + cleanSnapshotDir(); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreTest.java new file mode 100644 index 0000000000..964b663d6f --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/persistence/LocalSnapshotStoreTest.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2017 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.persistence; + +import static java.lang.Boolean.FALSE; +import static java.lang.Boolean.TRUE; +import static org.junit.Assert.assertEquals; +import static org.opendaylight.controller.cluster.persistence.LocalSnapshotStoreSpecTest.SNAPSHOT_DIR; +import static org.opendaylight.controller.cluster.persistence.LocalSnapshotStoreSpecTest.cleanSnapshotDir; +import static org.opendaylight.controller.cluster.persistence.LocalSnapshotStoreSpecTest.createSnapshotDir; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.persistence.Persistence; +import akka.persistence.SelectedSnapshot; +import akka.persistence.SnapshotMetadata; +import akka.persistence.SnapshotProtocol; +import akka.persistence.SnapshotProtocol.LoadSnapshot; +import akka.persistence.SnapshotProtocol.LoadSnapshotFailed; +import akka.persistence.SnapshotProtocol.LoadSnapshotResult; +import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.serialization.Snapshot; +import akka.persistence.serialization.SnapshotSerializer; +import akka.testkit.JavaTestKit; +import com.google.common.base.Throwables; +import com.typesafe.config.ConfigFactory; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.SerializationUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.MockitoAnnotations; +import scala.Option; + +/** + * Unit tests for LocalSnapshotStore. These are in addition to LocalSnapshotStoreSpecTest to cover a few cases + * that SnapshotStoreSpec doesn't. + * + * @author Thomas Pantelis + */ +public class LocalSnapshotStoreTest { + private static final String PERSISTENCE_ID = "member-1-shard-default-config"; + + private static ActorSystem system; + private static ActorRef snapshotStore; + + @BeforeClass + public static void staticSetup() { + createSnapshotDir(); + + system = ActorSystem.create("test", ConfigFactory.load("LocalSnapshotStoreTest.conf")); + snapshotStore = system.registerExtension(Persistence.lookup()).snapshotStoreFor(null); + } + + @AfterClass + public static void staticCleanup() { + FileUtils.deleteQuietly(SNAPSHOT_DIR); + JavaTestKit.shutdownActorSystem(system); + } + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + cleanSnapshotDir(); + } + + @After + public void cleanup() { + cleanSnapshotDir(); + } + + @Test + public void testDoLoadAsync() throws IOException { + createSnapshotFile(PERSISTENCE_ID, "one", 0, 1000); + createSnapshotFile(PERSISTENCE_ID, "two", 1, 2000); + createSnapshotFile(PERSISTENCE_ID, "three", 1, 3000); + + createSnapshotFile("member-1-shard-default-oper", "foo", 0, 1000); + createSnapshotFile("member-1-shard-toaster-oper", "foo", 0, 1000); + new File(SNAPSHOT_DIR, "other").createNewFile(); + new File(SNAPSHOT_DIR, "other-1485349217290").createNewFile(); + + SnapshotMetadata metadata3 = new SnapshotMetadata(PERSISTENCE_ID, 1, 3000); + + JavaTestKit probe = new JavaTestKit(system); + snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID, + SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef()); + LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class); + Option possibleSnapshot = result.snapshot(); + + assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty()); + assertEquals("SelectedSnapshot metadata", metadata3, possibleSnapshot.get().metadata()); + assertEquals("SelectedSnapshot snapshot", "three", possibleSnapshot.get().snapshot()); + } + + @Test + public void testDoLoadAsyncWithNoSnapshots() throws IOException { + JavaTestKit probe = new JavaTestKit(system); + snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID, + SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef()); + LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class); + Option possibleSnapshot = result.snapshot(); + + assertEquals("SelectedSnapshot present", FALSE, possibleSnapshot.nonEmpty()); + } + + @Test + public void testDoLoadAsyncWithRetry() throws IOException { + createSnapshotFile(PERSISTENCE_ID, "one", 0, 1000); + createSnapshotFile(PERSISTENCE_ID, null, 1, 2000); + + SnapshotMetadata metadata = new SnapshotMetadata(PERSISTENCE_ID, 0, 1000); + + JavaTestKit probe = new JavaTestKit(system); + snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID, + SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef()); + LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class); + Option possibleSnapshot = result.snapshot(); + + assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty()); + assertEquals("SelectedSnapshot metadata", metadata, possibleSnapshot.get().metadata()); + assertEquals("SelectedSnapshot snapshot", "one", possibleSnapshot.get().snapshot()); + } + + @Test(expected = IOException.class) + public void testDoLoadAsyncWithFailure() throws IOException { + createSnapshotFile(PERSISTENCE_ID, null, 1, 2000); + + JavaTestKit probe = new JavaTestKit(system); + snapshotStore.tell(new SnapshotProtocol.LoadSnapshot(PERSISTENCE_ID, + SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef()); + LoadSnapshotFailed failed = probe.expectMsgClass(LoadSnapshotFailed.class); + Throwables.propagateIfInstanceOf(failed.cause(), IOException.class); + } + + @Test + public void testDoLoadAsyncWithAkkaSerializedSnapshot() throws IOException { + SnapshotSerializer snapshotSerializer = new SnapshotSerializer((ExtendedActorSystem) system); + + String name = toSnapshotName(PERSISTENCE_ID, 1, 1000); + try (FileOutputStream fos = new FileOutputStream(new File(SNAPSHOT_DIR, name))) { + fos.write(snapshotSerializer.toBinary(new Snapshot("one"))); + } + + SnapshotMetadata metadata = new SnapshotMetadata(PERSISTENCE_ID, 1, 1000); + + JavaTestKit probe = new JavaTestKit(system); + snapshotStore.tell(new LoadSnapshot(PERSISTENCE_ID, + SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef()); + LoadSnapshotResult result = probe.expectMsgClass(LoadSnapshotResult.class); + Option possibleSnapshot = result.snapshot(); + + assertEquals("SelectedSnapshot present", TRUE, possibleSnapshot.nonEmpty()); + assertEquals("SelectedSnapshot metadata", metadata, possibleSnapshot.get().metadata()); + assertEquals("SelectedSnapshot snapshot", "one", possibleSnapshot.get().snapshot()); + } + + private void createSnapshotFile(String persistenceId, String payload, int seqNr, int timestamp) throws IOException { + String name = toSnapshotName(persistenceId, seqNr, timestamp); + try (FileOutputStream fos = new FileOutputStream(new File(SNAPSHOT_DIR, name))) { + if (payload != null) { + fos.write(SerializationUtils.serialize(payload)); + } + } + } + + private static String toSnapshotName(String persistenceId, int seqNr, int timestamp) { + return "snapshot-" + persistenceId + "-" + seqNr + "-" + timestamp; + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf new file mode 100644 index 0000000000..83b1ece6f2 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/resources/LocalSnapshotStoreTest.conf @@ -0,0 +1,7 @@ +akka { + persistence { + snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore" + snapshot-store.plugin = akka.persistence.snapshot-store.local + snapshot-store.local.dir = "target/snapshots" + } +} diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf index af200deedc..a9591f1a77 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/factory-akka.conf @@ -92,6 +92,8 @@ odl-cluster-data { persistence { journal.plugin = akka.persistence.journal.leveldb + + snapshot-store.local.class = "org.opendaylight.controller.cluster.persistence.LocalSnapshotStore" snapshot-store.plugin = akka.persistence.snapshot-store.local } }