Add integration test for segmented journal
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / AbstractDistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.junit.runners.Parameterized.Parameter;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
20
21 import akka.actor.ActorSystem;
22 import com.google.common.base.Throwables;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.util.concurrent.FluentFuture;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import com.google.common.util.concurrent.Uninterruptibles;
28 import java.math.BigInteger;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.List;
34 import java.util.Optional;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.Test;
40 import org.mockito.Mockito;
41 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
42 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
43 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
44 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
47 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
48 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
49 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
50 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
51 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
53 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
58 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
59 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
60 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
61 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
62 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
63 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
64 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
65 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
66 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
67 import org.opendaylight.yangtools.concepts.ListenerRegistration;
68 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
69 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
71 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
75 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
76 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
77 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
78
79 public abstract class AbstractDistributedDataStoreIntegrationTest {
80
81     @Parameter
82     public Class<? extends AbstractDataStore> testParameter;
83
84     protected ActorSystem system;
85
86     protected final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
87             .shardHeartbeatIntervalInMillis(100);
88
89     protected ActorSystem getSystem() {
90         return system;
91     }
92
93     @Test
94     public void testWriteTransactionWithSingleShard() throws Exception {
95         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
96         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
97             testParameter, "transactionIntegrationTest", "test-1")) {
98
99             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
100                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
101
102             testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
103                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
104                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
105                 .build());
106         }
107     }
108
109     @Test
110     public void testWriteTransactionWithMultipleShards() throws Exception {
111         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
112         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
113             testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
114
115             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
116             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
117
118             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
119             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
120
121             testKit.doCommit(writeTx.ready());
122
123             writeTx = dataStore.newWriteOnlyTransaction();
124
125             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
126             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
127
128             testKit.doCommit(writeTx.ready());
129
130             writeTx = dataStore.newWriteOnlyTransaction();
131
132             final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
133             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
134             writeTx.write(carPath, car);
135
136             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
137             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
138             writeTx.write(personPath, person);
139
140             testKit.doCommit(writeTx.ready());
141
142             // Verify the data in the store
143             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
144
145             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
146             assertTrue("isPresent", optional.isPresent());
147             assertEquals("Data node", car, optional.get());
148
149             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
150             assertTrue("isPresent", optional.isPresent());
151             assertEquals("Data node", person, optional.get());
152         }
153     }
154
155     @Test
156     public void testReadWriteTransactionWithSingleShard() throws Exception {
157         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
158         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
159             testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
160
161             // 1. Create a read-write Tx
162             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
163             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
164
165             // 2. Write some data
166             final YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
167             final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
168             readWriteTx.write(nodePath, nodeToWrite);
169
170             // 3. Read the data from Tx
171             final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
172             assertEquals("exists", Boolean.TRUE, exists);
173
174             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
175             assertTrue("isPresent", optional.isPresent());
176             assertEquals("Data node", nodeToWrite, optional.get());
177
178             // 4. Ready the Tx for commit
179             final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
180
181             // 5. Commit the Tx
182             testKit.doCommit(cohort);
183
184             // 6. Verify the data in the store
185             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
186
187             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
188             assertTrue("isPresent", optional.isPresent());
189             assertEquals("Data node", nodeToWrite, optional.get());
190         }
191     }
192
193     @Test
194     public void testReadWriteTransactionWithMultipleShards() throws Exception {
195         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
196         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
197             testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
198
199             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
200             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
201
202             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
203             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
204
205             testKit.doCommit(readWriteTx.ready());
206
207             readWriteTx = dataStore.newReadWriteTransaction();
208
209             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
210             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
211
212             testKit.doCommit(readWriteTx.ready());
213
214             readWriteTx = dataStore.newReadWriteTransaction();
215
216             final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
217             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
218             readWriteTx.write(carPath, car);
219
220             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
221             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
222             readWriteTx.write(personPath, person);
223
224             final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
225             assertEquals("exists", Boolean.TRUE, exists);
226
227             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
228             assertTrue("isPresent", optional.isPresent());
229             assertEquals("Data node", car, optional.get());
230
231             testKit.doCommit(readWriteTx.ready());
232
233             // Verify the data in the store
234             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
235
236             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
237             assertTrue("isPresent", optional.isPresent());
238             assertEquals("Data node", car, optional.get());
239
240             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
241             assertTrue("isPresent", optional.isPresent());
242             assertEquals("Data node", person, optional.get());
243         }
244     }
245
246     @Test
247     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
248         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
249         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
250             testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
251
252             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
253
254             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
255             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
256             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
257             testKit.doCommit(writeTx.ready());
258
259             writeTx = txChain.newWriteOnlyTransaction();
260
261             int numCars = 5;
262             for (int i = 0; i < numCars; i++) {
263                 writeTx.write(CarsModel.newCarPath("car" + i),
264                     CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
265             }
266
267             testKit.doCommit(writeTx.ready());
268
269             final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
270                     .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
271             assertTrue("isPresent", optional.isPresent());
272             assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
273         }
274     }
275
276     @SuppressWarnings("checkstyle:IllegalCatch")
277     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
278             throws Exception {
279         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
280         final String shardName = "default";
281
282         // We don't want the shard to become the leader so prevent shard
283         // elections.
284         datastoreContextBuilder.customRaftPolicyImplementation(
285                 "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy");
286
287         // The ShardManager uses the election timeout for FindPrimary so
288         // reset it low so it will timeout quickly.
289         datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
290         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
291
292         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
293
294             final Object result = dataStore.getActorUtils().executeOperation(
295                 dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true));
296             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
297
298             // Create the write Tx.
299             DOMStoreWriteTransaction writeTxToClose = null;
300             try {
301                 writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction()
302                         : dataStore.newReadWriteTransaction();
303                 final DOMStoreWriteTransaction writeTx = writeTxToClose;
304                 assertNotNull("newReadWriteTransaction returned null", writeTx);
305
306                 // Do some modifications and ready the Tx on a separate
307                 // thread.
308                 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
309                 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
310                 final CountDownLatch txReady = new CountDownLatch(1);
311                 final Thread txThread = new Thread(() -> {
312                     try {
313                         writeTx.write(TestModel.JUNK_PATH,
314                             ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
315
316                         txCohort.set(writeTx.ready());
317                     } catch (Exception e) {
318                         caughtEx.set(e);
319                     } finally {
320                         txReady.countDown();
321                     }
322                 });
323
324                 txThread.start();
325
326                 // Wait for the Tx operations to complete.
327                 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
328                 if (caughtEx.get() != null) {
329                     throw caughtEx.get();
330                 }
331
332                 assertTrue("Tx ready", done);
333
334                 // Wait for the commit to complete. Since no shard
335                 // leader was elected in time, the Tx
336                 // should have timed out and throw an appropriate
337                 // exception cause.
338                 try {
339                     txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
340                     fail("Expected NoShardLeaderException");
341                 } catch (final ExecutionException e) {
342                     final String msg = "Unexpected exception: "
343                             + Throwables.getStackTraceAsString(e.getCause());
344                     if (DistributedDataStore.class.equals(testParameter)) {
345                         assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
346                     } else {
347                         assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
348                     }
349                 }
350             } finally {
351                 try {
352                     if (writeTxToClose != null) {
353                         writeTxToClose.close();
354                     }
355                 } catch (Exception e) {
356                     // FIXME TransactionProxy.close throws IllegalStateException:
357                     // Transaction is ready, it cannot be closed
358                 }
359             }
360         }
361     }
362
363     @Test
364     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Exception {
365         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
366         testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader");
367     }
368
369     @Test
370     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Exception {
371         testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader");
372     }
373
374     @Test
375     public void testTransactionAbort() throws Exception {
376         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
377         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
378             testParameter, "transactionAbortIntegrationTest", "test-1")) {
379
380             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
381             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
382
383             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
384
385             final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
386
387             cohort.canCommit().get(5, TimeUnit.SECONDS);
388
389             cohort.abort().get(5, TimeUnit.SECONDS);
390
391             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
392                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
393         }
394     }
395
396     @Test
397     @SuppressWarnings("checkstyle:IllegalCatch")
398     public void testTransactionChainWithSingleShard() throws Exception {
399         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
400         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
401             testParameter, "testTransactionChainWithSingleShard", "test-1")) {
402
403             // 1. Create a Tx chain and write-only Tx
404             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
405
406             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
407             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
408
409             // 2. Write some data
410             final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
411             writeTx.write(TestModel.TEST_PATH, testNode);
412
413             // 3. Ready the Tx for commit
414             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
415
416             // 4. Commit the Tx on another thread that first waits for
417             // the second read Tx.
418             final CountDownLatch continueCommit1 = new CountDownLatch(1);
419             final CountDownLatch commit1Done = new CountDownLatch(1);
420             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
421             new Thread(() -> {
422                 try {
423                     continueCommit1.await();
424                     testKit.doCommit(cohort1);
425                 } catch (Exception e) {
426                     commit1Error.set(e);
427                 } finally {
428                     commit1Done.countDown();
429                 }
430             }).start();
431
432             // 5. Create a new read Tx from the chain to read and verify
433             // the data from the first
434             // Tx is visible after being readied.
435             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
436             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
437             assertTrue("isPresent", optional.isPresent());
438             assertEquals("Data node", testNode, optional.get());
439
440             // 6. Create a new RW Tx from the chain, write more data,
441             // and ready it
442             final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
443             final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
444                     .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
445                     .build();
446             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
447
448             final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
449
450             // 7. Create a new read Tx from the chain to read the data
451             // from the last RW Tx to
452             // verify it is visible.
453             readTx = txChain.newReadWriteTransaction();
454             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
455             assertTrue("isPresent", optional.isPresent());
456             assertEquals("Data node", outerNode, optional.get());
457
458             // 8. Wait for the 2 commits to complete and close the
459             // chain.
460             continueCommit1.countDown();
461             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
462
463             if (commit1Error.get() != null) {
464                 throw commit1Error.get();
465             }
466
467             testKit.doCommit(cohort2);
468
469             txChain.close();
470
471             // 9. Create a new read Tx from the data store and verify
472             // committed data.
473             readTx = dataStore.newReadOnlyTransaction();
474             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
475             assertTrue("isPresent", optional.isPresent());
476             assertEquals("Data node", outerNode, optional.get());
477         }
478     }
479
480     @Test
481     public void testTransactionChainWithMultipleShards() throws Exception {
482         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
483         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
484             testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
485
486             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
487
488             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
489             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
490
491             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
492             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
493
494             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
495             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
496
497             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
498
499             final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
500
501             final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
502             final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
503             readWriteTx.write(carPath, car);
504
505             final MapEntryNode person = PeopleModel.newPersonEntry("jack");
506             final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
507             readWriteTx.merge(personPath, person);
508
509             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
510             assertTrue("isPresent", optional.isPresent());
511             assertEquals("Data node", car, optional.get());
512
513             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
514             assertTrue("isPresent", optional.isPresent());
515             assertEquals("Data node", person, optional.get());
516
517             final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
518
519             writeTx = txChain.newWriteOnlyTransaction();
520
521             writeTx.delete(carPath);
522
523             final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
524
525             final ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
526             final ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
527
528             testKit.doCommit(canCommit1, cohort1);
529             testKit.doCommit(canCommit2, cohort2);
530             testKit.doCommit(cohort3);
531
532             txChain.close();
533
534             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
535
536             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
537             assertFalse("isPresent", optional.isPresent());
538
539             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
540             assertTrue("isPresent", optional.isPresent());
541             assertEquals("Data node", person, optional.get());
542         }
543     }
544
545     @Test
546     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
547         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
548         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
549             testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
550
551             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
552                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
553                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
554                 MoreExecutors.directExecutor());
555
556             final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
557             DOMTransactionChain txChain = broker.createTransactionChain(listener);
558
559             final List<ListenableFuture<?>> futures = new ArrayList<>();
560
561             final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
562             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
563             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
564             futures.add(writeTx.commit());
565
566             int numCars = 100;
567             for (int i = 0; i < numCars; i++) {
568                 final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
569
570                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
571                     CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
572
573                 futures.add(rwTx.commit());
574             }
575
576             for (final ListenableFuture<?> f : futures) {
577                 f.get(5, TimeUnit.SECONDS);
578             }
579
580             final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
581                     .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
582             assertTrue("isPresent", optional.isPresent());
583             assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
584
585             txChain.close();
586
587             broker.close();
588         }
589     }
590
591     @Test
592     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
593         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
594         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
595             testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
596
597             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
598
599             final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
600
601             rwTx1.ready();
602
603             final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
604
605             final Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
606             assertFalse("isPresent", optional.isPresent());
607
608             txChain.close();
609         }
610     }
611
612     @Test
613     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
614         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
615         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
616             testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
617
618             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
619
620             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
621             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
622
623             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
624
625             // Try to create another Tx of each type - each should fail
626             // b/c the previous Tx wasn't
627             // readied.
628             testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
629         }
630     }
631
632     @Test
633     public void testCreateChainedTransactionAfterClose() throws Exception {
634         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
635         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
636             testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
637
638             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
639             txChain.close();
640
641             // Try to create another Tx of each type - should fail b/c
642             // the previous Tx was closed.
643             testKit.assertExceptionOnTxChainCreates(txChain, DOMTransactionChainClosedException.class);
644         }
645     }
646
647     @Test
648     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
649         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
650         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
651             testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
652
653             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
654
655             // Create a write tx and submit.
656             final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
657             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
658             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
659
660             // Create read-only tx's and issue a read.
661             FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture1 = txChain
662                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
663
664             FluentFuture<Optional<NormalizedNode<?, ?>>> readFuture2 = txChain
665                     .newReadOnlyTransaction().read(TestModel.TEST_PATH);
666
667             // Create another write tx and issue the write.
668             DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction();
669             writeTx2.write(TestModel.OUTER_LIST_PATH,
670                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
671                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
672                 .build());
673
674             // Ensure the reads succeed.
675
676             assertTrue("isPresent", readFuture1.get(5, TimeUnit.SECONDS).isPresent());
677             assertTrue("isPresent", readFuture2.get(5, TimeUnit.SECONDS).isPresent());
678
679             // Ensure the writes succeed.
680             DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready();
681
682             testKit.doCommit(cohort1);
683             testKit.doCommit(cohort2);
684
685             assertTrue("isPresent", txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH)
686                 .get(5, TimeUnit.SECONDS).isPresent());
687         }
688     }
689
690     @Test
691     public void testChainedTransactionFailureWithSingleShard() throws Exception {
692         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
693         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
694             testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
695
696             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
697                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
698                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
699                 MoreExecutors.directExecutor());
700
701             final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
702             final DOMTransactionChain txChain = broker.createTransactionChain(listener);
703
704             final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
705
706             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
707                 PeopleModel.emptyContainer());
708
709             final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
710                     .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
711                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
712
713             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
714
715             try {
716                 writeTx.commit().get(5, TimeUnit.SECONDS);
717                 fail("Expected TransactionCommitFailedException");
718             } catch (final ExecutionException e) {
719                 // Expected
720             }
721
722             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
723                 any(Throwable.class));
724
725             txChain.close();
726             broker.close();
727         }
728     }
729
730     @Test
731     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
732         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
733         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
734             testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
735
736             final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
737                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
738                 .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
739                 MoreExecutors.directExecutor());
740
741             final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
742             final DOMTransactionChain txChain = broker.createTransactionChain(listener);
743
744             final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
745
746             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
747                 PeopleModel.emptyContainer());
748
749             final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
750                     .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
751                     .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
752
753             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
754
755             // Note that merge will validate the data and fail but put
756             // succeeds b/c deep validation is not
757             // done for put for performance reasons.
758             try {
759                 writeTx.commit().get(5, TimeUnit.SECONDS);
760                 fail("Expected TransactionCommitFailedException");
761             } catch (final ExecutionException e) {
762                 // Expected
763             }
764
765             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
766                 any(Throwable.class));
767
768             txChain.close();
769             broker.close();
770         }
771     }
772
773     @Test
774     public void testDataTreeChangeListenerRegistration() throws Exception {
775         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
776         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
777             testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
778
779             testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
780                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
781
782             final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
783
784             ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
785                     .registerTreeChangeListener(TestModel.TEST_PATH, listener);
786
787             assertNotNull("registerTreeChangeListener returned null", listenerReg);
788
789             IntegrationTestKit.verifyShardState(dataStore, "test-1",
790                 state -> assertEquals("getTreeChangeListenerActors", 1,
791                         state.getTreeChangeListenerActors().size()));
792
793             // Wait for the initial notification
794             listener.waitForChangeEvents(TestModel.TEST_PATH);
795             listener.reset(2);
796
797             // Write 2 updates.
798             testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
799                 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
800                 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
801                 .build());
802
803             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
804                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
805             testKit.testWriteTransaction(dataStore, listPath,
806                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
807
808             // Wait for the 2 updates.
809             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
810             listenerReg.close();
811
812             IntegrationTestKit.verifyShardState(dataStore, "test-1",
813                 state -> assertEquals("getTreeChangeListenerActors", 0,
814                     state.getTreeChangeListenerActors().size()));
815
816             testKit.testWriteTransaction(dataStore,
817                 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
818                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
819                 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
820
821             listener.expectNoMoreChanges("Received unexpected change after close");
822         }
823     }
824
825     @Test
826     public void testRestoreFromDatastoreSnapshot() throws Exception {
827         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
828         final String name = "transactionIntegrationTest";
829
830         final ContainerNode carsNode = CarsModel.newCarsNode(
831             CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)),
832                 CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L))));
833
834         DataTree dataTree = new InMemoryDataTreeFactory().create(
835             DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
836         AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
837         NormalizedNode<?, ?> root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
838
839         final Snapshot carsSnapshot = Snapshot.create(
840             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
841             Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
842
843         dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
844             SchemaContextHelper.full());
845
846         final NormalizedNode<?, ?> peopleNode = PeopleModel.create();
847         AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
848
849         root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY);
850
851         final Snapshot peopleSnapshot = Snapshot.create(
852             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
853             Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null);
854
855         testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList(
856             new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
857             new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
858
859         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
860             testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
861
862             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
863
864             // two reads
865             Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
866             assertTrue("isPresent", optional.isPresent());
867             assertEquals("Data node", carsNode, optional.get());
868
869             optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
870             assertTrue("isPresent", optional.isPresent());
871             assertEquals("Data node", peopleNode, optional.get());
872         }
873     }
874 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.