Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractDistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.hamcrest.CoreMatchers.instanceOf;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertFalse;
15 import static org.junit.Assert.assertNotNull;
16 import static org.junit.Assert.assertThrows;
17 import static org.junit.Assert.assertTrue;
18 import static org.mockito.ArgumentMatchers.any;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.timeout;
21 import static org.mockito.Mockito.verify;
22
23 import akka.actor.ActorSystem;
24 import com.google.common.base.Throwables;
25 import com.google.common.collect.ImmutableMap;
26 import com.google.common.util.concurrent.FluentFuture;
27 import com.google.common.util.concurrent.FutureCallback;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.MoreExecutors;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Optional;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.Ignore;
42 import org.junit.Test;
43 import org.junit.runners.Parameterized.Parameter;
44 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
45 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
46 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
47 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
48 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
49 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
50 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
51 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
52 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
53 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
54 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
55 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
56 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
57 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
61 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
64 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
65 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
66 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
67 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
68 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
69 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
70 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
71 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
72 import org.opendaylight.yangtools.yang.common.Uint64;
73 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
74 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
75 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
76 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
77 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
78 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
79 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
80 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
81 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
82 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
83 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
84 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
85
86 public abstract class AbstractDistributedDataStoreIntegrationTest {
87     @Parameter
88     public Class<? extends ClientBackedDataStore> testParameter;
89
90     protected ActorSystem system;
91
92     protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
93             .shardHeartbeatIntervalInMillis(100);
94
95     protected ActorSystem getSystem() {
96         return system;
97     }
98
99     @Test
100     public void testWriteTransactionWithSingleShard() throws Exception {
101         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
102         try (var dataStore = testKit.setupDataStore(testParameter, "transactionIntegrationTest", "test-1")) {
103
104             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
105                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
106
107             testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
108                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
109                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
110                 .build());
111         }
112     }
113
114     @Test
115     public void testWriteTransactionWithMultipleShards() throws Exception {
116         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
117         try (var dataStore = testKit.setupDataStore(testParameter, "testWriteTransactionWithMultipleShards",
118             "cars-1", "people-1")) {
119
120             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
121             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
122
123             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
124             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
125
126             testKit.doCommit(writeTx.ready());
127
128             writeTx = dataStore.newWriteOnlyTransaction();
129
130             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
131             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
132
133             testKit.doCommit(writeTx.ready());
134
135             writeTx = dataStore.newWriteOnlyTransaction();
136
137             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
138             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
139             writeTx.write(carPath, car);
140
141             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
142             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
143             writeTx.write(personPath, person);
144
145             testKit.doCommit(writeTx.ready());
146
147             // Verify the data in the store
148             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
149
150             assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
151             assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
152         }
153     }
154
155     @Test
156     public void testReadWriteTransactionWithSingleShard() throws Exception {
157         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
158         try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithSingleShard",
159             "test-1")) {
160
161             // 1. Create a read-write Tx
162             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
163             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
164
165             // 2. Write some data
166             final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
167             final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
168             readWriteTx.write(nodePath, nodeToWrite);
169
170             // 3. Read the data from Tx
171             final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
172             assertEquals("exists", Boolean.TRUE, exists);
173
174             assertEquals(Optional.of(nodeToWrite), readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS));
175
176             // 4. Ready the Tx for commit
177             final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
178
179             // 5. Commit the Tx
180             testKit.doCommit(cohort);
181
182             // 6. Verify the data in the store
183             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
184
185             assertEquals(Optional.of(nodeToWrite), readTx.read(nodePath).get(5, TimeUnit.SECONDS));
186         }
187     }
188
189     @Test
190     public void testReadWriteTransactionWithMultipleShards() throws Exception {
191         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
192         try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithMultipleShards",
193             "cars-1", "people-1")) {
194
195             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
196             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
197
198             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
199             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
200
201             testKit.doCommit(readWriteTx.ready());
202
203             readWriteTx = dataStore.newReadWriteTransaction();
204
205             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
206             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
207
208             testKit.doCommit(readWriteTx.ready());
209
210             readWriteTx = dataStore.newReadWriteTransaction();
211
212             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
213             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
214             readWriteTx.write(carPath, car);
215
216             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
217             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
218             readWriteTx.write(personPath, person);
219
220             final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
221             assertEquals("exists", Boolean.TRUE, exists);
222
223             assertEquals("Data node", Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
224
225             testKit.doCommit(readWriteTx.ready());
226
227             // Verify the data in the store
228             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
229
230             assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
231             assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
232         }
233     }
234
235     @Test
236     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
237         final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
238         try (var dataStore = testKit.setupDataStore(testParameter, "testSingleTransactionsWritesInQuickSuccession",
239             "cars-1")) {
240
241             final var txChain = dataStore.createTransactionChain();
242
243             var writeTx = txChain.newWriteOnlyTransaction();
244             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
245             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
246             testKit.doCommit(writeTx.ready());
247
248             int numCars = 5;
249             for (int i = 0; i < numCars; i++) {
250                 writeTx = txChain.newWriteOnlyTransaction();
251                 writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
252
253                 testKit.doCommit(writeTx.ready());
254
255                 try (var tx = txChain.newReadOnlyTransaction()) {
256                     tx.read(CarsModel.BASE_PATH).get();
257                 }
258             }
259
260             // wait to let the shard catch up with purged
261             await("transaction state propagation").atMost(5, TimeUnit.SECONDS)
262                 .pollInterval(500, TimeUnit.MILLISECONDS)
263                 .untilAsserted(() -> {
264                     // verify frontend metadata has no holes in purged transactions causing overtime memory leak
265                     final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
266                     final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
267                             .executeOperation(localShard, new RequestFrontendMetadata());
268
269                     final var clientMeta = frontendMetadata.getClients().get(0);
270                     final var iterator = clientMeta.getCurrentHistories().iterator();
271                     var metadata = iterator.next();
272                     while (iterator.hasNext() && metadata.getHistoryId() != 1) {
273                         metadata = iterator.next();
274                     }
275                     assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
276                 });
277
278             final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
279                 .get(5, TimeUnit.SECONDS)
280                 .orElseThrow()
281                 .body();
282             assertThat(body, instanceOf(Collection.class));
283             assertEquals("# cars", numCars, ((Collection<?>) body).size());
284         }
285     }
286
287     @SuppressWarnings("checkstyle:IllegalCatch")
288     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
289             throws Exception {
290         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
291         final String shardName = "default";
292
293         // We don't want the shard to become the leader so prevent shard
294         // elections.
295         datastoreContextBuilder.customRaftPolicyImplementation(
296                 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
297
298         // The ShardManager uses the election timeout for FindPrimary so
299         // reset it low so it will timeout quickly.
300         datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
301         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
302
303         try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
304             final Object result = dataStore.getActorUtils().executeOperation(
305                 dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true));
306             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
307
308             // Create the write Tx.
309             DOMStoreWriteTransaction writeTxToClose = null;
310             try {
311                 writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
312                         : dataStore.newReadWriteTransaction();
313                 final DOMStoreWriteTransaction writeTx = writeTxToClose;
314                 assertNotNull("newReadWriteTransaction returned null", writeTx);
315
316                 // Do some modifications and ready the Tx on a separate
317                 // thread.
318                 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
319                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
320                 final CountDownLatch txReady = new CountDownLatch(1);
321                 final Thread txThread = new Thread(() -> {
322                     try {
323                         writeTx.write(TestModel.JUNK_PATH,
324                             ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
325
326                         txCohort.set(writeTx.ready());
327                     } catch (Exception e) {
328                         caughtEx.set(e);
329                     } finally {
330                         txReady.countDown();
331                     }
332                 });
333
334                 txThread.start();
335
336                 // Wait for the Tx operations to complete.
337                 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
338                 if (caughtEx.get() != null) {
339                     throw caughtEx.get();
340                 }
341
342                 assertTrue("Tx ready", done);
343
344                 // Wait for the commit to complete. Since no shard
345                 // leader was elected in time, the Tx
346                 // should have timed out and throw an appropriate
347                 // exception cause.
348                 final var ex = assertThrows(ExecutionException.class,
349                     () -> txCohort.get().canCommit().get(10, TimeUnit.SECONDS));
350                 assertTrue("Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()),
351                     Throwables.getRootCause(ex) instanceof RequestTimeoutException);
352             } finally {
353                 try {
354                     if (writeTxToClose != null) {
355                         writeTxToClose.close();
356                     }
357                 } catch (Exception e) {
358                     // FIXME TransactionProxy.close throws IllegalStateException:
359                     // Transaction is ready, it cannot be closed
360                 }
361             }
362         }
363     }
364
365     @Test
366     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
367         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
368         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
369     }
370
371     @Test
372     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
373         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
374     }
375
376     @Test
377     public void testTransactionAbort() throws Exception {
378         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
379         try (var dataStore = testKit.setupDataStore(testParameter, "transactionAbortIntegrationTest", "test-1")) {
380
381             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
382             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
383
384             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
385
386             final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
387
388             cohort.canCommit().get(5, TimeUnit.SECONDS);
389
390             cohort.abort().get(5, TimeUnit.SECONDS);
391
392             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
393                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
394         }
395     }
396
397     @Test
398     @SuppressWarnings("checkstyle:IllegalCatch")
399     public void testTransactionChainWithSingleShard() throws Exception {
400         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
401         try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithSingleShard", "test-1")) {
402
403             // 1. Create a Tx chain and write-only Tx
404             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
405
406             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
407             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
408
409             // 2. Write some data
410             final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
411             writeTx.write(TestModel.TEST_PATH, testNode);
412
413             // 3. Ready the Tx for commit
414             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
415
416             // 4. Commit the Tx on another thread that first waits for
417             // the second read Tx.
418             final CountDownLatch continueCommit1 = new CountDownLatch(1);
419             final CountDownLatch commit1Done = new CountDownLatch(1);
420             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
421             new Thread(() -> {
422                 try {
423                     continueCommit1.await();
424                     testKit.doCommit(cohort1);
425                 } catch (Exception e) {
426                     commit1Error.set(e);
427                 } finally {
428                     commit1Done.countDown();
429                 }
430             }).start();
431
432             // 5. Create a new read Tx from the chain to read and verify
433             // the data from the first
434             // Tx is visible after being readied.
435             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
436             assertEquals(Optional.of(testNode), readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
437
438             // 6. Create a new RW Tx from the chain, write more data,
439             // and ready it
440             final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
441             final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
442                     .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
443                     .build();
444             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
445
446             final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
447
448             // 7. Create a new read Tx from the chain to read the data
449             // from the last RW Tx to
450             // verify it is visible.
451             readTx = txChain.newReadWriteTransaction();
452             assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
453
454             // 8. Wait for the 2 commits to complete and close the
455             // chain.
456             continueCommit1.countDown();
457             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
458
459             if (commit1Error.get() != null) {
460                 throw commit1Error.get();
461             }
462
463             testKit.doCommit(cohort2);
464
465             txChain.close();
466
467             // 9. Create a new read Tx from the data store and verify
468             // committed data.
469             readTx = dataStore.newReadOnlyTransaction();
470             assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
471         }
472     }
473
474     @Test
475     public void testTransactionChainWithMultipleShards() throws Exception {
476         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
477         try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithMultipleShards",
478             "cars-1", "people-1")) {
479
480             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
481
482             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
483             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
484
485             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
486             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
487
488             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
489             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
490
491             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
492
493             final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
494
495             final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
496             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
497             readWriteTx.write(carPath, car);
498
499             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
500             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
501             readWriteTx.merge(personPath, person);
502
503             assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
504             assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS));
505
506             final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
507
508             writeTx = txChain.newWriteOnlyTransaction();
509
510             writeTx.delete(carPath);
511
512             final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
513
514             final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
515             final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
516
517             testKit.doCommit(canCommit1, cohort1);
518             testKit.doCommit(canCommit2, cohort2);
519             testKit.doCommit(cohort3);
520
521             txChain.close();
522
523             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
524
525             assertEquals(Optional.empty(), readTx.read(carPath).get(5, TimeUnit.SECONDS));
526             assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
527         }
528     }
529
530     @Test
531     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
532         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
533         try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionsInQuickSuccession",
534             "cars-1")) {
535
536             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
537                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
538                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
539                 MoreExecutors.directExecutor());
540
541             DOMTransactionChain txChain = broker.createTransactionChain();
542
543             final List<ListenableFuture<?>> futures = new ArrayList<>();
544
545             final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
546             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
547             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
548             futures.add(writeTx.commit());
549
550             int numCars = 100;
551             for (int i = 0; i < numCars; i++) {
552                 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
553
554                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
555                     CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
556
557                 futures.add(rwTx.commit());
558             }
559
560             for (final ListenableFuture<?> f : futures) {
561                 f.get(5, TimeUnit.SECONDS);
562             }
563
564             final Optional<NormalizedNode> optional = txChain.newReadOnlyTransaction()
565                     .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
566             assertTrue("isPresent", optional.isPresent());
567             assertEquals("# cars", numCars, ((Collection<?>) optional.orElseThrow().body()).size());
568
569             txChain.close();
570
571             broker.close();
572         }
573     }
574
575     @Test
576     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
577         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
578         try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterEmptyTxReadied",
579             "test-1")) {
580
581             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
582
583             final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
584
585             rwTx1.ready();
586
587             final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
588
589             final Optional<NormalizedNode> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
590             assertFalse("isPresent", optional.isPresent());
591
592             txChain.close();
593         }
594     }
595
596     @Test
597     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
598         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
599         try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionWhenPreviousNotReady",
600             "test-1")) {
601
602             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
603
604             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
605             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
606
607             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
608
609             // Try to create another Tx of each type - each should fail
610             // b/c the previous Tx wasn't
611             // readied.
612             testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
613         }
614     }
615
616     @Test
617     public void testCreateChainedTransactionAfterClose() throws Exception {
618         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
619         try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterClose",
620             "test-1")) {
621
622             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
623             txChain.close();
624
625             // Try to create another Tx of each type - should fail b/c
626             // the previous Tx was closed.
627             testKit.assertExceptionOnTxChainCreates(txChain, DOMTransactionChainClosedException.class);
628         }
629     }
630
631     @Test
632     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
633         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
634         try (var dataStore = testKit.setupDataStore(testParameter, "testChainWithReadOnlyTxAfterPreviousReady",
635             "test-1")) {
636
637             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
638
639             // Create a write tx and submit.
640             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
641             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
642             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
643
644             // Create read-only tx's and issue a read.
645             FluentFuture<Optional<NormalizedNode>> readFuture1 = txChain
646                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
647
648             FluentFuture<Optional<NormalizedNode>> readFuture2 = txChain
649                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
650
651             // Create another write tx and issue the write.
652             DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
653             writeTx2.write(TestModel.OUTER_LIST_PATH,
654                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
655                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
656                 .build());
657
658             // Ensure the reads succeed.
659
660             assertTrue("isPresent", readFuture1.get(5, TimeUnit.SECONDS).isPresent());
661             assertTrue("isPresent", readFuture2.get(5, TimeUnit.SECONDS).isPresent());
662
663             // Ensure the writes succeed.
664             DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
665
666             testKit.doCommit(cohort1);
667             testKit.doCommit(cohort2);
668
669             assertTrue("isPresent", txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
670                 .get(5, TimeUnit.SECONDS).isPresent());
671         }
672     }
673
674     @Test
675     public void testChainedTransactionFailureWithSingleShard() throws Exception {
676         final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
677         try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithSingleShard",
678             "cars-1")) {
679
680             final var broker = new ConcurrentDOMDataBroker(
681                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
682                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
683                 MoreExecutors.directExecutor());
684
685             final var listener = mock(FutureCallback.class);
686             final var txChain = broker.createTransactionChain();
687             txChain.addCallback(listener);
688
689             final var writeTx = txChain.newReadWriteTransaction();
690
691             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
692                 PeopleModel.emptyContainer());
693
694             final var invalidData = Builders.containerBuilder()
695                     .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
696                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
697                     .build();
698
699             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
700
701             assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
702
703             verify(listener, timeout(5000)).onFailure(any());
704
705             txChain.close();
706             broker.close();
707         }
708     }
709
710     @Test
711     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
712         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
713         try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithMultipleShards",
714             "cars-1", "people-1")) {
715
716             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
717                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
718                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
719                 MoreExecutors.directExecutor());
720
721             final var listener = mock(FutureCallback.class);
722             final DOMTransactionChain txChain = broker.createTransactionChain();
723             txChain.addCallback(listener);
724
725             final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
726
727             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
728                 PeopleModel.emptyContainer());
729
730             final ContainerNode invalidData = Builders.containerBuilder()
731                 .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
732                 .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
733                 .build();
734
735             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
736
737             // Note that merge will validate the data and fail but put
738             // succeeds b/c deep validation is not
739             // done for put for performance reasons.
740             assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
741
742             verify(listener, timeout(5000)).onFailure(any());
743
744             txChain.close();
745             broker.close();
746         }
747     }
748
749     @Test
750     public void testDataTreeChangeListenerRegistration() throws Exception {
751         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
752         try (var dataStore = testKit.setupDataStore(testParameter, "testDataTreeChangeListenerRegistration",
753             "test-1")) {
754
755             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
756                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
757
758             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
759
760             final var listenerReg = dataStore.registerTreeChangeListener(TestModel.TEST_PATH, listener);
761
762             assertNotNull("registerTreeChangeListener returned null", listenerReg);
763
764             IntegrationTestKit.verifyShardState(dataStore, "test-1",
765                 state -> assertEquals("getTreeChangeListenerActors", 1,
766                         state.getTreeChangeListenerActors().size()));
767
768             // Wait for the initial notification
769             listener.waitForChangeEvents(TestModel.TEST_PATH);
770             listener.reset(2);
771
772             // Write 2 updates.
773             testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
774                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
775                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
776                 .build());
777
778             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
779                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
780             testKit.testWriteTransaction(dataStore, listPath,
781                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
782
783             // Wait for the 2 updates.
784             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
785             listenerReg.close();
786
787             IntegrationTestKit.verifyShardState(dataStore, "test-1",
788                 state -> assertEquals("getTreeChangeListenerActors", 0,
789                     state.getTreeChangeListenerActors().size()));
790
791             testKit.testWriteTransaction(dataStore,
792                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
793                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
794                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
795
796             listener.expectNoMoreChanges("Received unexpected change after close");
797         }
798     }
799
800     @Test
801     public void testRestoreFromDatastoreSnapshot() throws Exception {
802         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
803         final String name = "transactionIntegrationTest";
804
805         final ContainerNode carsNode = CarsModel.newCarsNode(
806             CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000)),
807                 CarsModel.newCarEntry("sportage", Uint64.valueOf(30000))));
808
809         DataTree dataTree = new InMemoryDataTreeFactory().create(
810             DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
811         AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
812         NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of());
813
814         final Snapshot carsSnapshot = Snapshot.create(
815             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
816             Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
817
818         dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
819             SchemaContextHelper.full());
820
821         final NormalizedNode peopleNode = PeopleModel.create();
822         AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
823
824         root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of());
825
826         final Snapshot peopleSnapshot = Snapshot.create(
827             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
828             Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
829
830         testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
831             new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
832             new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
833
834         try (var dataStore = testKit.setupDataStore(testParameter, name, "module-shards-member1.conf", true,
835             "cars", "people")) {
836
837             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
838
839             // two reads
840             assertEquals(Optional.of(carsNode), readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS));
841             assertEquals(Optional.of(peopleNode), readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS));
842         }
843     }
844
845     @Test
846     @Ignore("ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate")
847     // FIXME: re-enable this test
848     public void testSnapshotOnRootOverwrite() throws Exception {
849         final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder.snapshotOnRootOverwrite(true));
850         try (var dataStore = testKit.setupDataStore(testParameter, "testRootOverwrite",
851             "module-shards-default-cars-member1.conf", true, "cars", "default")) {
852
853             final var rootNode = Builders.containerBuilder()
854                 .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))
855                 .withChild(CarsModel.create())
856                 .build();
857
858             testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode);
859             IntegrationTestKit.verifyShardState(dataStore, "cars",
860                 state -> assertEquals(1, state.getSnapshotIndex()));
861
862             // root has been written expect snapshot at index 0
863             verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
864
865             for (int i = 0; i < 10; i++) {
866                 testKit.testWriteTransaction(dataStore, CarsModel.newCarPath("car " + i),
867                     CarsModel.newCarEntry("car " + i, Uint64.ONE));
868             }
869
870             // fake snapshot causes the snapshotIndex to move
871             IntegrationTestKit.verifyShardState(dataStore, "cars",
872                 state -> assertEquals(10, state.getSnapshotIndex()));
873
874             // however the real snapshot still has not changed and was taken at index 0
875             verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
876
877             // root overwrite so expect a snapshot
878             testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode);
879
880             // this was a real snapshot so everything should be in it(1 + 10 + 1)
881             IntegrationTestKit.verifyShardState(dataStore, "cars",
882                 state -> assertEquals(12, state.getSnapshotIndex()));
883
884             verifySnapshot("member-1-shard-cars-testRootOverwrite", 12, 1);
885         }
886     }
887
888     private static void verifySnapshot(final String persistenceId, final long lastAppliedIndex,
889             final long lastAppliedTerm) {
890         await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
891                 List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
892                 assertEquals(1, snap.size());
893                 assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex());
894                 assertEquals(lastAppliedTerm, snap.get(0).getLastAppliedTerm());
895             }
896         );
897     }
898 }