*/
package org.opendaylight.controller.cluster.datastore;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.runners.Parameterized.Parameter;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.Uint64;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public abstract class AbstractDistributedDataStoreIntegrationTest {
writeTx = dataStore.newWriteOnlyTransaction();
- final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
writeTx.write(carPath, car);
// Verify the data in the store
final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
- Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", car, optional.get());
-
- optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", person, optional.get());
+ assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
}
}
// 2. Write some data
final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
- final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
readWriteTx.write(nodePath, nodeToWrite);
// 3. Read the data from Tx
final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
assertEquals("exists", Boolean.TRUE, exists);
- Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", nodeToWrite, optional.get());
+ assertEquals(Optional.of(nodeToWrite), readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS));
// 4. Ready the Tx for commit
final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
// 6. Verify the data in the store
final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
- optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", nodeToWrite, optional.get());
+ assertEquals(Optional.of(nodeToWrite), readTx.read(nodePath).get(5, TimeUnit.SECONDS));
}
}
readWriteTx = dataStore.newReadWriteTransaction();
- final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
readWriteTx.write(carPath, car);
final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
assertEquals("exists", Boolean.TRUE, exists);
- Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", car, optional.get());
+ assertEquals("Data node", Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
testKit.doCommit(readWriteTx.ready());
// Verify the data in the store
DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
- optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", car, optional.get());
-
- optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", person, optional.get());
+ assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
}
}
writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
testKit.doCommit(writeTx.ready());
- writeTx = txChain.newWriteOnlyTransaction();
-
int numCars = 5;
for (int i = 0; i < numCars; i++) {
- writeTx.write(CarsModel.newCarPath("car" + i),
- CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
+
+ testKit.doCommit(writeTx.ready());
+
+ try (var tx = txChain.newReadOnlyTransaction()) {
+ tx.read(CarsModel.BASE_PATH).get();
+ }
}
- testKit.doCommit(writeTx.ready());
+ // wait to let the shard catch up with purged
+ await("transaction state propagation").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ // verify frontend metadata has no holes in purged transactions causing overtime memory leak
+ final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+ .executeOperation(localShard, new RequestFrontendMetadata());
+
+ final var clientMeta = frontendMetadata.getClients().get(0);
+ if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ assertTellMetadata(clientMeta);
+ } else {
+ assertAskMetadata(clientMeta);
+ }
+ });
- final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
- .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
+ .get(5, TimeUnit.SECONDS)
+ .orElseThrow()
+ .body();
+ assertThat(body, instanceOf(Collection.class));
+ assertEquals("# cars", numCars, ((Collection<?>) body).size());
+ }
+ }
+
+ private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
+ // ask based should track no metadata
+ assertEquals(List.of(), clientMeta.getCurrentHistories());
+ }
+
+ private static void assertTellMetadata(final FrontendClientMetadata clientMeta) {
+ final var iterator = clientMeta.getCurrentHistories().iterator();
+ var metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
}
+ assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
}
@SuppressWarnings("checkstyle:IllegalCatch")
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: "
+ Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
+ if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
} else {
assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
assertNotNull("newWriteOnlyTransaction returned null", writeTx);
// 2. Write some data
- final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
writeTx.write(TestModel.TEST_PATH, testNode);
// 3. Ready the Tx for commit
// the data from the first
// Tx is visible after being readied.
DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
- Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", testNode, optional.get());
+ assertEquals(Optional.of(testNode), readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
// 6. Create a new RW Tx from the chain, write more data,
// and ready it
// from the last RW Tx to
// verify it is visible.
readTx = txChain.newReadWriteTransaction();
- optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", outerNode, optional.get());
+ assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
// 8. Wait for the 2 commits to complete and close the
// chain.
// 9. Create a new read Tx from the data store and verify
// committed data.
readTx = dataStore.newReadOnlyTransaction();
- optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", outerNode, optional.get());
+ assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
}
}
final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
- final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
readWriteTx.write(carPath, car);
final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
readWriteTx.merge(personPath, person);
- Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", car, optional.get());
-
- optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", person, optional.get());
+ assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS));
final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
- optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertFalse("isPresent", optional.isPresent());
-
- optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", person, optional.get());
+ assertEquals(Optional.empty(), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
}
}
final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
- CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+ CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
futures.add(rwTx.commit());
}
f.get(5, TimeUnit.SECONDS);
}
- final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ final Optional<NormalizedNode> optional = txChain.newReadOnlyTransaction()
.read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
assertTrue("isPresent", optional.isPresent());
- assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.orElseThrow().body()).size());
txChain.close();
final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
- final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+ final Optional<NormalizedNode> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
assertFalse("isPresent", optional.isPresent());
txChain.close();
final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
// Create read-only tx's and issue a read.
- FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture1 = txChain
+ FluentFuture<Optional<NormalizedNode>> readFuture1 = txChain
.newReadOnlyTransaction().read(TestModel.TEST_PATH);
- FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture2 = txChain
+ FluentFuture<Optional<NormalizedNode>> readFuture2 = txChain
.newReadOnlyTransaction().read(TestModel.TEST_PATH);
// Create another write tx and issue the write.
writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
PeopleModel.emptyContainer());
- final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+ final ContainerNode invalidData = Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+ .build();
writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
PeopleModel.emptyContainer());
- final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+ final ContainerNode invalidData = Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+ .build();
writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
final String name = "transactionIntegrationTest";
final ContainerNode carsNode = CarsModel.newCarsNode(
- CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
- CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
+ CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)),
+ CarsModel.newCarEntry("sportage", Uint64.valueOf(30000))));
DataTree dataTree = new InMemoryDataTreeFactory().create(
DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
- NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+ NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
final Snapshot carsSnapshot = Snapshot.create(
new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
SchemaContextHelper.full());
- final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
+ final NormalizedNode peopleNode = PeopleModel.create();
AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
- root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
+ root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
final Snapshot peopleSnapshot = Snapshot.create(
new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
// two reads
- Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", carsNode, optional.get());
+ assertEquals(Optional.of(carsNode), readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(peopleNode), readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS));
+ }
+ }
- optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", peopleNode, optional.get());
+ @Test
+ public void testSnapshotOnRootOverwrite() throws Exception {
+ if (!DistributedDataStore.class.isAssignableFrom(testParameter)) {
+ // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
+ return;
}
+
+ final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(),
+ datastoreContextBuilder.snapshotOnRootOverwrite(true));
+ try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
+ testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf",
+ true, "cars", "default")) {
+
+ ContainerNode rootNode = Builders.containerBuilder()
+ .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))
+ .withChild(CarsModel.create())
+ .build();
+
+ testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
+ IntegrationTestKit.verifyShardState(dataStore, "cars",
+ state -> assertEquals(1, state.getSnapshotIndex()));
+
+ // root has been written expect snapshot at index 0
+ verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
+
+ for (int i = 0; i < 10; i++) {
+ testKit.testWriteTransaction(dataStore, CarsModel.newCarPath("car " + i),
+ CarsModel.newCarEntry("car " + i, Uint64.ONE));
+ }
+
+ // fake snapshot causes the snapshotIndex to move
+ IntegrationTestKit.verifyShardState(dataStore, "cars",
+ state -> assertEquals(10, state.getSnapshotIndex()));
+
+ // however the real snapshot still has not changed and was taken at index 0
+ verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
+
+ // root overwrite so expect a snapshot
+ testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
+
+ // this was a real snapshot so everything should be in it(1 + 10 + 1)
+ IntegrationTestKit.verifyShardState(dataStore, "cars",
+ state -> assertEquals(12, state.getSnapshotIndex()));
+
+ verifySnapshot("member-1-shard-cars-testRootOverwrite", 12, 1);
+ }
+ }
+
+ private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex,
+ final long lastAppliedTerm) {
+ await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
+ assertEquals(1, snap.size());
+ assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
+ assertEquals(lastAppliedTerm, snap.get(0).getLastAppliedTerm());
+ }
+ );
}
}