cbe494f06c91f5a20fcf155bf8fc13cc7cc0ddc4
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractDistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.junit.Assert.fail;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSystem;
23 import com.google.common.base.Throwables;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.collect.Range;
26 import com.google.common.primitives.UnsignedLong;
27 import com.google.common.util.concurrent.FluentFuture;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.MoreExecutors;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Optional;
38 import java.util.Set;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.junit.Ignore;
44 import org.junit.Test;
45 import org.junit.runners.Parameterized.Parameter;
46 import org.mockito.Mockito;
47 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
48 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
49 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
51 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
53 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
54 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
55 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
56 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
57 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
58 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
59 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
60 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
61 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
62 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
63 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
64 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
65 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
66 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
68 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
69 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
70 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
71 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
72 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
73 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
74 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
75 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
76 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
77 import org.opendaylight.yangtools.concepts.ListenerRegistration;
78 import org.opendaylight.yangtools.yang.common.Uint64;
79 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
80 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
81 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
82 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
83 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
84 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
85 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
86 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
87 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
88 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
89 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
90
91 public abstract class AbstractDistributedDataStoreIntegrationTest {
92
93     @Parameter
94     public Class<? extends AbstractDataStore> testParameter;
95
96     protected ActorSystem system;
97
98     protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
99             .shardHeartbeatIntervalInMillis(100);
100
101     protected ActorSystem getSystem() {
102         return system;
103     }
104
105     @Test
106     public void testWriteTransactionWithSingleShard() throws Exception {
107         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
108         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
109             testParameter, "transactionIntegrationTest", "test-1")) {
110
111             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
112                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
113
114             testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
115                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
116                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
117                 .build());
118         }
119     }
120
121     @Test
122     public void testWriteTransactionWithMultipleShards() throws Exception {
123         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
124         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
125             testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
126
127             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
128             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
129
130             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
131             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
132
133             testKit.doCommit(writeTx.ready());
134
135             writeTx = dataStore.newWriteOnlyTransaction();
136
137             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
138             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
139
140             testKit.doCommit(writeTx.ready());
141
142             writeTx = dataStore.newWriteOnlyTransaction();
143
144             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
145             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
146             writeTx.write(carPath, car);
147
148             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
149             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
150             writeTx.write(personPath, person);
151
152             testKit.doCommit(writeTx.ready());
153
154             // Verify the data in the store
155             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
156
157             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
158             assertTrue("isPresent", optional.isPresent());
159             assertEquals("Data node", car, optional.get());
160
161             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
162             assertTrue("isPresent", optional.isPresent());
163             assertEquals("Data node", person, optional.get());
164         }
165     }
166
167     @Test
168     public void testReadWriteTransactionWithSingleShard() throws Exception {
169         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
170         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
171             testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
172
173             // 1. Create a read-write Tx
174             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
175             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
176
177             // 2. Write some data
178             final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
179             final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
180             readWriteTx.write(nodePath, nodeToWrite);
181
182             // 3. Read the data from Tx
183             final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
184             assertEquals("exists", Boolean.TRUE, exists);
185
186             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
187             assertTrue("isPresent", optional.isPresent());
188             assertEquals("Data node", nodeToWrite, optional.get());
189
190             // 4. Ready the Tx for commit
191             final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
192
193             // 5. Commit the Tx
194             testKit.doCommit(cohort);
195
196             // 6. Verify the data in the store
197             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
198
199             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
200             assertTrue("isPresent", optional.isPresent());
201             assertEquals("Data node", nodeToWrite, optional.get());
202         }
203     }
204
205     @Test
206     public void testReadWriteTransactionWithMultipleShards() throws Exception {
207         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
208         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
209             testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
210
211             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
212             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
213
214             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
215             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
216
217             testKit.doCommit(readWriteTx.ready());
218
219             readWriteTx = dataStore.newReadWriteTransaction();
220
221             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
222             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
223
224             testKit.doCommit(readWriteTx.ready());
225
226             readWriteTx = dataStore.newReadWriteTransaction();
227
228             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
229             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
230             readWriteTx.write(carPath, car);
231
232             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
233             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
234             readWriteTx.write(personPath, person);
235
236             final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
237             assertEquals("exists", Boolean.TRUE, exists);
238
239             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
240             assertTrue("isPresent", optional.isPresent());
241             assertEquals("Data node", car, optional.get());
242
243             testKit.doCommit(readWriteTx.ready());
244
245             // Verify the data in the store
246             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
247
248             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
249             assertTrue("isPresent", optional.isPresent());
250             assertEquals("Data node", car, optional.get());
251
252             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
253             assertTrue("isPresent", optional.isPresent());
254             assertEquals("Data node", person, optional.get());
255         }
256     }
257
258     @Test
259     @Ignore("Flushes a closed tx leak in single node, needs to be handled separately")
260     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
261         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
262         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
263             testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
264
265             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
266
267             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
268             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
269             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
270             testKit.doCommit(writeTx.ready());
271
272             int numCars = 5;
273             for (int i = 0; i < numCars; i++) {
274                 writeTx = txChain.newWriteOnlyTransaction();
275                 writeTx.write(CarsModel.newCarPath("car" + i),
276                     CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
277
278                 testKit.doCommit(writeTx.ready());
279
280                 DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
281                 domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
282
283                 domStoreReadTransaction.close();
284             }
285
286             // verify frontend metadata has no holes in purged transactions causing overtime memory leak
287             Optional<ActorRef> localShard = dataStore.getActorUtils().findLocalShard("cars-1");
288             FrontendShardDataTreeSnapshotMetadata frontendMetadata =
289                     (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
290                             .executeOperation(localShard.get(), new RequestFrontendMetadata());
291
292             if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
293                 Iterator<FrontendHistoryMetadata> iterator =
294                         frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
295                 FrontendHistoryMetadata metadata = iterator.next();
296                 while (iterator.hasNext() && metadata.getHistoryId() != 1) {
297                     metadata = iterator.next();
298                 }
299                 Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
300
301                 assertEquals(1, ranges.size());
302             } else {
303                 // ask based should track no metadata
304                 assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
305             }
306
307             final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
308                     .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
309             assertTrue("isPresent", optional.isPresent());
310             assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
311         }
312     }
313
314     @SuppressWarnings("checkstyle:IllegalCatch")
315     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
316             throws Exception {
317         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
318         final String shardName = "default";
319
320         // We don't want the shard to become the leader so prevent shard
321         // elections.
322         datastoreContextBuilder.customRaftPolicyImplementation(
323                 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
324
325         // The ShardManager uses the election timeout for FindPrimary so
326         // reset it low so it will timeout quickly.
327         datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
328         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
329
330         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
331
332             final Object result = dataStore.getActorUtils().executeOperation(
333                 dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true));
334             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
335
336             // Create the write Tx.
337             DOMStoreWriteTransaction writeTxToClose = null;
338             try {
339                 writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
340                         : dataStore.newReadWriteTransaction();
341                 final DOMStoreWriteTransaction writeTx = writeTxToClose;
342                 assertNotNull("newReadWriteTransaction returned null", writeTx);
343
344                 // Do some modifications and ready the Tx on a separate
345                 // thread.
346                 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
347                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
348                 final CountDownLatch txReady = new CountDownLatch(1);
349                 final Thread txThread = new Thread(() -> {
350                     try {
351                         writeTx.write(TestModel.JUNK_PATH,
352                             ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
353
354                         txCohort.set(writeTx.ready());
355                     } catch (Exception e) {
356                         caughtEx.set(e);
357                     } finally {
358                         txReady.countDown();
359                     }
360                 });
361
362                 txThread.start();
363
364                 // Wait for the Tx operations to complete.
365                 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
366                 if (caughtEx.get() != null) {
367                     throw caughtEx.get();
368                 }
369
370                 assertTrue("Tx ready", done);
371
372                 // Wait for the commit to complete. Since no shard
373                 // leader was elected in time, the Tx
374                 // should have timed out and throw an appropriate
375                 // exception cause.
376                 try {
377                     txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
378                     fail("Expected NoShardLeaderException");
379                 } catch (final ExecutionException e) {
380                     final String msg = "Unexpected exception: "
381                             + Throwables.getStackTraceAsString(e.getCause());
382                     if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
383                         assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
384                     } else {
385                         assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
386                     }
387                 }
388             } finally {
389                 try {
390                     if (writeTxToClose != null) {
391                         writeTxToClose.close();
392                     }
393                 } catch (Exception e) {
394                     // FIXME TransactionProxy.close throws IllegalStateException:
395                     // Transaction is ready, it cannot be closed
396                 }
397             }
398         }
399     }
400
401     @Test
402     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
403         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
404         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
405     }
406
407     @Test
408     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
409         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
410     }
411
412     @Test
413     public void testTransactionAbort() throws Exception {
414         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
415         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
416             testParameter, "transactionAbortIntegrationTest", "test-1")) {
417
418             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
419             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
420
421             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
422
423             final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
424
425             cohort.canCommit().get(5, TimeUnit.SECONDS);
426
427             cohort.abort().get(5, TimeUnit.SECONDS);
428
429             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
430                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
431         }
432     }
433
434     @Test
435     @SuppressWarnings("checkstyle:IllegalCatch")
436     public void testTransactionChainWithSingleShard() throws Exception {
437         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
438         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
439             testParameter, "testTransactionChainWithSingleShard", "test-1")) {
440
441             // 1. Create a Tx chain and write-only Tx
442             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
443
444             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
445             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
446
447             // 2. Write some data
448             final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
449             writeTx.write(TestModel.TEST_PATH, testNode);
450
451             // 3. Ready the Tx for commit
452             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
453
454             // 4. Commit the Tx on another thread that first waits for
455             // the second read Tx.
456             final CountDownLatch continueCommit1 = new CountDownLatch(1);
457             final CountDownLatch commit1Done = new CountDownLatch(1);
458             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
459             new Thread(() -> {
460                 try {
461                     continueCommit1.await();
462                     testKit.doCommit(cohort1);
463                 } catch (Exception e) {
464                     commit1Error.set(e);
465                 } finally {
466                     commit1Done.countDown();
467                 }
468             }).start();
469
470             // 5. Create a new read Tx from the chain to read and verify
471             // the data from the first
472             // Tx is visible after being readied.
473             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
474             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
475             assertTrue("isPresent", optional.isPresent());
476             assertEquals("Data node", testNode, optional.get());
477
478             // 6. Create a new RW Tx from the chain, write more data,
479             // and ready it
480             final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
481             final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
482                     .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
483                     .build();
484             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
485
486             final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
487
488             // 7. Create a new read Tx from the chain to read the data
489             // from the last RW Tx to
490             // verify it is visible.
491             readTx = txChain.newReadWriteTransaction();
492             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
493             assertTrue("isPresent", optional.isPresent());
494             assertEquals("Data node", outerNode, optional.get());
495
496             // 8. Wait for the 2 commits to complete and close the
497             // chain.
498             continueCommit1.countDown();
499             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
500
501             if (commit1Error.get() != null) {
502                 throw commit1Error.get();
503             }
504
505             testKit.doCommit(cohort2);
506
507             txChain.close();
508
509             // 9. Create a new read Tx from the data store and verify
510             // committed data.
511             readTx = dataStore.newReadOnlyTransaction();
512             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
513             assertTrue("isPresent", optional.isPresent());
514             assertEquals("Data node", outerNode, optional.get());
515         }
516     }
517
518     @Test
519     public void testTransactionChainWithMultipleShards() throws Exception {
520         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
521         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
522             testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
523
524             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
525
526             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
527             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
528
529             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
530             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
531
532             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
533             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
534
535             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
536
537             final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
538
539             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
540             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
541             readWriteTx.write(carPath, car);
542
543             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
544             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
545             readWriteTx.merge(personPath, person);
546
547             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
548             assertTrue("isPresent", optional.isPresent());
549             assertEquals("Data node", car, optional.get());
550
551             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
552             assertTrue("isPresent", optional.isPresent());
553             assertEquals("Data node", person, optional.get());
554
555             final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
556
557             writeTx = txChain.newWriteOnlyTransaction();
558
559             writeTx.delete(carPath);
560
561             final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
562
563             final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
564             final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
565
566             testKit.doCommit(canCommit1, cohort1);
567             testKit.doCommit(canCommit2, cohort2);
568             testKit.doCommit(cohort3);
569
570             txChain.close();
571
572             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
573
574             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
575             assertFalse("isPresent", optional.isPresent());
576
577             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
578             assertTrue("isPresent", optional.isPresent());
579             assertEquals("Data node", person, optional.get());
580         }
581     }
582
583     @Test
584     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
585         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
586         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
587             testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
588
589             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
590                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
591                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
592                 MoreExecutors.directExecutor());
593
594             final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
595             DOMTransactionChain txChain = broker.createTransactionChain(listener);
596
597             final List<ListenableFuture<?>> futures = new ArrayList<>();
598
599             final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
600             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
601             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
602             futures.add(writeTx.commit());
603
604             int numCars = 100;
605             for (int i = 0; i < numCars; i++) {
606                 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
607
608                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
609                     CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
610
611                 futures.add(rwTx.commit());
612             }
613
614             for (final ListenableFuture<?> f : futures) {
615                 f.get(5, TimeUnit.SECONDS);
616             }
617
618             final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
619                     .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
620             assertTrue("isPresent", optional.isPresent());
621             assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
622
623             txChain.close();
624
625             broker.close();
626         }
627     }
628
629     @Test
630     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
631         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
632         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
633             testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
634
635             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
636
637             final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
638
639             rwTx1.ready();
640
641             final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
642
643             final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
644             assertFalse("isPresent", optional.isPresent());
645
646             txChain.close();
647         }
648     }
649
650     @Test
651     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
652         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
653         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
654             testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
655
656             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
657
658             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
659             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
660
661             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
662
663             // Try to create another Tx of each type - each should fail
664             // b/c the previous Tx wasn't
665             // readied.
666             testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
667         }
668     }
669
670     @Test
671     public void testCreateChainedTransactionAfterClose() throws Exception {
672         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
673         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
674             testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
675
676             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
677             txChain.close();
678
679             // Try to create another Tx of each type - should fail b/c
680             // the previous Tx was closed.
681             testKit.assertExceptionOnTxChainCreates(txChain, DOMTransactionChainClosedException.class);
682         }
683     }
684
685     @Test
686     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
687         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
688         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
689             testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
690
691             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
692
693             // Create a write tx and submit.
694             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
695             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
696             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
697
698             // Create read-only tx's and issue a read.
699             FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture1 = txChain
700                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
701
702             FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture2 = txChain
703                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
704
705             // Create another write tx and issue the write.
706             DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
707             writeTx2.write(TestModel.OUTER_LIST_PATH,
708                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
709                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
710                 .build());
711
712             // Ensure the reads succeed.
713
714             assertTrue("isPresent", readFuture1.get(5, TimeUnit.SECONDS).isPresent());
715             assertTrue("isPresent", readFuture2.get(5, TimeUnit.SECONDS).isPresent());
716
717             // Ensure the writes succeed.
718             DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
719
720             testKit.doCommit(cohort1);
721             testKit.doCommit(cohort2);
722
723             assertTrue("isPresent", txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
724                 .get(5, TimeUnit.SECONDS).isPresent());
725         }
726     }
727
728     @Test
729     public void testChainedTransactionFailureWithSingleShard() throws Exception {
730         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
731         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
732             testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
733
734             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
735                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
736                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
737                 MoreExecutors.directExecutor());
738
739             final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
740             final DOMTransactionChain txChain = broker.createTransactionChain(listener);
741
742             final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
743
744             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
745                 PeopleModel.emptyContainer());
746
747             final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
748                     .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
749                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
750
751             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
752
753             try {
754                 writeTx.commit().get(5, TimeUnit.SECONDS);
755                 fail("Expected TransactionCommitFailedException");
756             } catch (final ExecutionException e) {
757                 // Expected
758             }
759
760             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
761                 any(Throwable.class));
762
763             txChain.close();
764             broker.close();
765         }
766     }
767
768     @Test
769     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
770         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
771         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
772             testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
773
774             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
775                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
776                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
777                 MoreExecutors.directExecutor());
778
779             final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
780             final DOMTransactionChain txChain = broker.createTransactionChain(listener);
781
782             final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
783
784             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
785                 PeopleModel.emptyContainer());
786
787             final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
788                     .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
789                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
790
791             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
792
793             // Note that merge will validate the data and fail but put
794             // succeeds b/c deep validation is not
795             // done for put for performance reasons.
796             try {
797                 writeTx.commit().get(5, TimeUnit.SECONDS);
798                 fail("Expected TransactionCommitFailedException");
799             } catch (final ExecutionException e) {
800                 // Expected
801             }
802
803             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
804                 any(Throwable.class));
805
806             txChain.close();
807             broker.close();
808         }
809     }
810
811     @Test
812     public void testDataTreeChangeListenerRegistration() throws Exception {
813         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
814         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
815             testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
816
817             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
818                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
819
820             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
821
822             ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
823                     .registerTreeChangeListener(TestModel.TEST_PATH, listener);
824
825             assertNotNull("registerTreeChangeListener returned null", listenerReg);
826
827             IntegrationTestKit.verifyShardState(dataStore, "test-1",
828                 state -> assertEquals("getTreeChangeListenerActors", 1,
829                         state.getTreeChangeListenerActors().size()));
830
831             // Wait for the initial notification
832             listener.waitForChangeEvents(TestModel.TEST_PATH);
833             listener.reset(2);
834
835             // Write 2 updates.
836             testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
837                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
838                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
839                 .build());
840
841             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
842                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
843             testKit.testWriteTransaction(dataStore, listPath,
844                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
845
846             // Wait for the 2 updates.
847             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
848             listenerReg.close();
849
850             IntegrationTestKit.verifyShardState(dataStore, "test-1",
851                 state -> assertEquals("getTreeChangeListenerActors", 0,
852                     state.getTreeChangeListenerActors().size()));
853
854             testKit.testWriteTransaction(dataStore,
855                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
856                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
857                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
858
859             listener.expectNoMoreChanges("Received unexpected change after close");
860         }
861     }
862
863     @Test
864     public void testRestoreFromDatastoreSnapshot() throws Exception {
865         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
866         final String name = "transactionIntegrationTest";
867
868         final ContainerNode carsNode = CarsModel.newCarsNode(
869             CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)),
870                 CarsModel.newCarEntry("sportage", Uint64.valueOf(30000))));
871
872         DataTree dataTree = new InMemoryDataTreeFactory().create(
873             DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
874         AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
875         NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
876
877         final Snapshot carsSnapshot = Snapshot.create(
878             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
879             Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
880
881         dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
882             SchemaContextHelper.full());
883
884         final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
885         AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
886
887         root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
888
889         final Snapshot peopleSnapshot = Snapshot.create(
890             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
891             Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
892
893         testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
894             new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
895             new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
896
897         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
898             testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
899
900             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
901
902             // two reads
903             Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
904             assertTrue("isPresent", optional.isPresent());
905             assertEquals("Data node", carsNode, optional.get());
906
907             optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
908             assertTrue("isPresent", optional.isPresent());
909             assertEquals("Data node", peopleNode, optional.get());
910         }
911     }
912
913     @Test
914     public void testSnapshotOnRootOverwrite() throws Exception {
915         if (!DistributedDataStore.class.isAssignableFrom(testParameter)) {
916             // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
917             return;
918         }
919
920         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(),
921                 datastoreContextBuilder.snapshotOnRootOverwrite(true));
922         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
923                 testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf",
924                 true, "cars", "default")) {
925
926             ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
927                     .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
928                     .withChild((ContainerNode) CarsModel.create())
929                     .build();
930
931             testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
932             IntegrationTestKit.verifyShardState(dataStore, "cars",
933                 state -> assertEquals(1, state.getSnapshotIndex()));
934
935             // root has been written expect snapshot at index 0
936             verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
937
938             for (int i = 0; i < 10; i++) {
939                 testKit.testWriteTransaction(dataStore, CarsModel.newCarPath("car " + i),
940                     CarsModel.newCarEntry("car " + i, Uint64.ONE));
941             }
942
943             // fake snapshot causes the snapshotIndex to move
944             IntegrationTestKit.verifyShardState(dataStore, "cars",
945                 state -> assertEquals(10, state.getSnapshotIndex()));
946
947             // however the real snapshot still has not changed and was taken at index 0
948             verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
949
950             // root overwrite so expect a snapshot
951             testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
952
953             // this was a real snapshot so everything should be in it(1 + 10 + 1)
954             IntegrationTestKit.verifyShardState(dataStore, "cars",
955                 state -> assertEquals(12, state.getSnapshotIndex()));
956
957             verifySnapshot("member-1-shard-cars-testRootOverwrite", 12, 1);
958         }
959     }
960
961     private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex,
962             final long lastAppliedTerm) {
963         await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
964                 List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
965                 assertEquals(1, snap.size());
966                 assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
967                 assertEquals(lastAppliedTerm, snap.get(0).getLastAppliedTerm());
968             }
969         );
970     }
971 }