Bug 2318: Ensure previous Tx in chain is readied before creating the next
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import akka.actor.ActorRef;
6 import akka.actor.ActorSystem;
7 import akka.actor.PoisonPill;
8 import com.google.common.base.Optional;
9 import com.google.common.util.concurrent.CheckedFuture;
10 import com.google.common.util.concurrent.Uninterruptibles;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.atomic.AtomicReference;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
17 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
18 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
19 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
20 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
21 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
22 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
23 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
24 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
25 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39
40 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
41
42     private final DatastoreContext.Builder datastoreContextBuilder =
43             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
44
45     @Test
46     public void testWriteTransactionWithSingleShard() throws Exception{
47         new IntegrationTestKit(getSystem()) {{
48             DistributedDataStore dataStore =
49                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
50
51             testWriteTransaction(dataStore, TestModel.TEST_PATH,
52                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
53
54             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
55                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
56
57             cleanup(dataStore);
58         }};
59     }
60
61     @Test
62     public void testWriteTransactionWithMultipleShards() throws Exception{
63         new IntegrationTestKit(getSystem()) {{
64             DistributedDataStore dataStore =
65                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
66
67             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
68             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
69
70             YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
71             NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
72             writeTx.write(nodePath1, nodeToWrite1);
73
74             YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
75             NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
76             writeTx.write(nodePath2, nodeToWrite2);
77
78             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
79
80             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
81             assertEquals("canCommit", true, canCommit);
82             cohort.preCommit().get(5, TimeUnit.SECONDS);
83             cohort.commit().get(5, TimeUnit.SECONDS);
84
85             // Verify the data in the store
86
87             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
88
89             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
90             assertEquals("isPresent", true, optional.isPresent());
91             assertEquals("Data node", nodeToWrite1, optional.get());
92
93             optional = readTx.read(nodePath2).get();
94             assertEquals("isPresent", true, optional.isPresent());
95             assertEquals("Data node", nodeToWrite2, optional.get());
96
97             cleanup(dataStore);
98         }};
99     }
100
101     @Test
102     public void testReadWriteTransaction() throws Exception{
103         System.setProperty("shard.persistent", "true");
104         new IntegrationTestKit(getSystem()) {{
105             DistributedDataStore dataStore =
106                     setupDistributedDataStore("testReadWriteTransaction", "test-1");
107
108             // 1. Create a read-write Tx
109
110             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
111             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
112
113             // 2. Write some data
114
115             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
116             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
117             readWriteTx.write(nodePath, nodeToWrite );
118
119             // 3. Read the data from Tx
120
121             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
122             assertEquals("exists", true, exists);
123
124             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
125             assertEquals("isPresent", true, optional.isPresent());
126             assertEquals("Data node", nodeToWrite, optional.get());
127
128             // 4. Ready the Tx for commit
129
130             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
131
132             // 5. Commit the Tx
133
134             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
135             assertEquals("canCommit", true, canCommit);
136             cohort.preCommit().get(5, TimeUnit.SECONDS);
137             cohort.commit().get(5, TimeUnit.SECONDS);
138
139             // 6. Verify the data in the store
140
141             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
142
143             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
144             assertEquals("isPresent", true, optional.isPresent());
145             assertEquals("Data node", nodeToWrite, optional.get());
146
147             cleanup(dataStore);
148         }};
149     }
150
151     @Test
152     public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
153         new IntegrationTestKit(getSystem()) {{
154             String testName = "testTransactionWritesWithShardNotInitiallyReady";
155             String shardName = "test-1";
156
157             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
158             // initialized until we create and submit the write the Tx.
159             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
160             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
161             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
162
163             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
164
165             // Create the write Tx
166
167             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
168             assertNotNull("newReadWriteTransaction returned null", writeTx);
169
170             // Do some modification operations and ready the Tx on a separate thread.
171
172             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
173                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
174                             TestModel.ID_QNAME, 1).build();
175
176             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
177             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
178             final CountDownLatch txReady = new CountDownLatch(1);
179             Thread txThread = new Thread() {
180                 @Override
181                 public void run() {
182                     try {
183                         writeTx.write(TestModel.TEST_PATH,
184                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
185
186                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
187                                 TestModel.OUTER_LIST_QNAME).build());
188
189                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
190                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
191
192                         writeTx.delete(listEntryPath);
193
194                         txCohort.set(writeTx.ready());
195                     } catch(Exception e) {
196                         caughtEx.set(e);
197                         return;
198                     } finally {
199                         txReady.countDown();
200                     }
201                 }
202             };
203
204             txThread.start();
205
206             // Wait for the Tx operations to complete.
207
208             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
209             if(caughtEx.get() != null) {
210                 throw caughtEx.get();
211             }
212
213             assertEquals("Tx ready", true, done);
214
215             // At this point the Tx operations should be waiting for the shard to initialize so
216             // trigger the latch to let the shard recovery to continue.
217
218             blockRecoveryLatch.countDown();
219
220             // Wait for the Tx commit to complete.
221
222             assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
223             txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
224             txCohort.get().commit().get(5, TimeUnit.SECONDS);
225
226             // Verify the data in the store
227
228             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
229
230             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
231                     get(5, TimeUnit.SECONDS);
232             assertEquals("isPresent", true, optional.isPresent());
233
234             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
235             assertEquals("isPresent", true, optional.isPresent());
236
237             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
238             assertEquals("isPresent", false, optional.isPresent());
239
240             cleanup(dataStore);
241         }};
242     }
243
244     @Test
245     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
246         new IntegrationTestKit(getSystem()) {{
247             String testName = "testTransactionReadsWithShardNotInitiallyReady";
248             String shardName = "test-1";
249
250             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
251             // initialized until we create the Tx.
252             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
253             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
254             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
255
256             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
257
258             // Create the read-write Tx
259
260             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
261             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
262
263             // Do some reads on the Tx on a separate thread.
264
265             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
266                     new AtomicReference<>();
267             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
268                     txReadFuture = new AtomicReference<>();
269             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
270             final CountDownLatch txReadsDone = new CountDownLatch(1);
271             Thread txThread = new Thread() {
272                 @Override
273                 public void run() {
274                     try {
275                         readWriteTx.write(TestModel.TEST_PATH,
276                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
277
278                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
279
280                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
281                     } catch(Exception e) {
282                         caughtEx.set(e);
283                         return;
284                     } finally {
285                         txReadsDone.countDown();
286                     }
287                 }
288             };
289
290             txThread.start();
291
292             // Wait for the Tx operations to complete.
293
294             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
295             if(caughtEx.get() != null) {
296                 throw caughtEx.get();
297             }
298
299             assertEquals("Tx reads done", true, done);
300
301             // At this point the Tx operations should be waiting for the shard to initialize so
302             // trigger the latch to let the shard recovery to continue.
303
304             blockRecoveryLatch.countDown();
305
306             // Wait for the reads to complete and verify.
307
308             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
309             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
310
311             readWriteTx.close();
312
313             cleanup(dataStore);
314         }};
315     }
316
317     @Test(expected=NotInitializedException.class)
318     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
319         new IntegrationTestKit(getSystem()) {{
320             String testName = "testTransactionCommitFailureWithShardNotInitialized";
321             String shardName = "test-1";
322
323             // Set the shard initialization timeout low for the test.
324
325             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
326
327             // Setup the InMemoryJournal to block shard recovery indefinitely.
328
329             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
330             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
331             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
332
333             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
334
335             // Create the write Tx
336
337             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
338             assertNotNull("newReadWriteTransaction returned null", writeTx);
339
340             // Do some modifications and ready the Tx on a separate thread.
341
342             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
343             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
344             final CountDownLatch txReady = new CountDownLatch(1);
345             Thread txThread = new Thread() {
346                 @Override
347                 public void run() {
348                     try {
349                         writeTx.write(TestModel.TEST_PATH,
350                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
351
352                         txCohort.set(writeTx.ready());
353                     } catch(Exception e) {
354                         caughtEx.set(e);
355                         return;
356                     } finally {
357                         txReady.countDown();
358                     }
359                 }
360             };
361
362             txThread.start();
363
364             // Wait for the Tx operations to complete.
365
366             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
367             if(caughtEx.get() != null) {
368                 throw caughtEx.get();
369             }
370
371             assertEquals("Tx ready", true, done);
372
373             // Wait for the commit to complete. Since the shard never initialized, the Tx should
374             // have timed out and throw an appropriate exception cause.
375
376             try {
377                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
378             } catch(ExecutionException e) {
379                 throw e.getCause();
380             } finally {
381                 blockRecoveryLatch.countDown();
382                 cleanup(dataStore);
383             }
384         }};
385     }
386
387     @Test(expected=NotInitializedException.class)
388     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
389         new IntegrationTestKit(getSystem()) {{
390             String testName = "testTransactionReadFailureWithShardNotInitialized";
391             String shardName = "test-1";
392
393             // Set the shard initialization timeout low for the test.
394
395             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
396
397             // Setup the InMemoryJournal to block shard recovery indefinitely.
398
399             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
400             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
401             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
402
403             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
404
405             // Create the read-write Tx
406
407             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
408             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
409
410             // Do a read on the Tx on a separate thread.
411
412             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
413                     txReadFuture = new AtomicReference<>();
414             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
415             final CountDownLatch txReadDone = new CountDownLatch(1);
416             Thread txThread = new Thread() {
417                 @Override
418                 public void run() {
419                     try {
420                         readWriteTx.write(TestModel.TEST_PATH,
421                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
422
423                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
424
425                         readWriteTx.close();
426                     } catch(Exception e) {
427                         caughtEx.set(e);
428                         return;
429                     } finally {
430                         txReadDone.countDown();
431                     }
432                 }
433             };
434
435             txThread.start();
436
437             // Wait for the Tx operations to complete.
438
439             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
440             if(caughtEx.get() != null) {
441                 throw caughtEx.get();
442             }
443
444             assertEquals("Tx read done", true, done);
445
446             // Wait for the read to complete. Since the shard never initialized, the Tx should
447             // have timed out and throw an appropriate exception cause.
448
449             try {
450                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
451             } catch(ReadFailedException e) {
452                 throw e.getCause();
453             } finally {
454                 blockRecoveryLatch.countDown();
455                 cleanup(dataStore);
456             }
457         }};
458     }
459
460     @Test(expected=NoShardLeaderException.class)
461     public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
462         new IntegrationTestKit(getSystem()) {{
463             String testName = "testTransactionCommitFailureWithNoShardLeader";
464             String shardName = "test-1";
465
466             // We don't want the shard to become the leader so prevent shard election from completing
467             // by setting the election timeout, which is based on the heartbeat interval, really high.
468
469             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
470
471             // Set the leader election timeout low for the test.
472
473             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
474
475             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
476
477             // Create the write Tx.
478
479             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
480             assertNotNull("newReadWriteTransaction returned null", writeTx);
481
482             // Do some modifications and ready the Tx on a separate thread.
483
484             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
485             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
486             final CountDownLatch txReady = new CountDownLatch(1);
487             Thread txThread = new Thread() {
488                 @Override
489                 public void run() {
490                     try {
491                         writeTx.write(TestModel.TEST_PATH,
492                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
493
494                         txCohort.set(writeTx.ready());
495                     } catch(Exception e) {
496                         caughtEx.set(e);
497                         return;
498                     } finally {
499                         txReady.countDown();
500                     }
501                 }
502             };
503
504             txThread.start();
505
506             // Wait for the Tx operations to complete.
507
508             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
509             if(caughtEx.get() != null) {
510                 throw caughtEx.get();
511             }
512
513             assertEquals("Tx ready", true, done);
514
515             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
516             // should have timed out and throw an appropriate exception cause.
517
518             try {
519                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
520             } catch(ExecutionException e) {
521                 throw e.getCause();
522             } finally {
523                 cleanup(dataStore);
524             }
525         }};
526     }
527
528     @Test
529     public void testTransactionAbort() throws Exception{
530         System.setProperty("shard.persistent", "true");
531         new IntegrationTestKit(getSystem()) {{
532             DistributedDataStore dataStore =
533                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
534
535             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
536             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
537
538             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
539
540             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
541
542             cohort.canCommit().get(5, TimeUnit.SECONDS);
543
544             cohort.abort().get(5, TimeUnit.SECONDS);
545
546             testWriteTransaction(dataStore, TestModel.TEST_PATH,
547                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
548
549             cleanup(dataStore);
550         }};
551     }
552
553     @Test
554     public void testTransactionChain() throws Exception{
555         System.setProperty("shard.persistent", "true");
556         new IntegrationTestKit(getSystem()) {{
557             DistributedDataStore dataStore =
558                     setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
559
560             // 1. Create a Tx chain and write-only Tx
561
562             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
563
564             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
565             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
566
567             // 2. Write some data
568
569             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
570             writeTx.write(TestModel.TEST_PATH, testNode);
571
572             // 3. Ready the Tx for commit
573
574             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
575
576             // 4. Commit the Tx on another thread that first waits for the second read Tx.
577
578             final CountDownLatch continueCommit1 = new CountDownLatch(1);
579             final CountDownLatch commit1Done = new CountDownLatch(1);
580             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
581             new Thread() {
582                 @Override
583                 public void run() {
584                     try {
585                         continueCommit1.await();
586                         doCommit(cohort1);
587                     } catch (Exception e) {
588                         commit1Error.set(e);
589                     } finally {
590                         commit1Done.countDown();
591                     }
592                 }
593             }.start();
594
595             // 5. Create a new read Tx from the chain to read and verify the data from the first
596             // Tx is visible after being readied.
597
598             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
599             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
600             assertEquals("isPresent", true, optional.isPresent());
601             assertEquals("Data node", testNode, optional.get());
602
603             // 6. Create a new RW Tx from the chain, write more data, and ready it
604
605             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
606             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
607             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
608
609             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
610
611             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
612             // verify it is visible.
613
614             readTx = txChain.newReadOnlyTransaction();
615             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
616             assertEquals("isPresent", true, optional.isPresent());
617             assertEquals("Data node", outerNode, optional.get());
618
619             // 8. Wait for the 2 commits to complete and close the chain.
620
621             continueCommit1.countDown();
622             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
623
624             if(commit1Error.get() != null) {
625                 throw commit1Error.get();
626             }
627
628             doCommit(cohort2);
629
630             txChain.close();
631
632             // 9. Create a new read Tx from the data store and verify committed data.
633
634             readTx = dataStore.newReadOnlyTransaction();
635             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
636             assertEquals("isPresent", true, optional.isPresent());
637             assertEquals("Data node", outerNode, optional.get());
638
639             cleanup(dataStore);
640         }
641
642         private void doCommit(final DOMStoreThreePhaseCommitCohort cohort1) throws Exception {
643             Boolean canCommit = cohort1.canCommit().get(5, TimeUnit.SECONDS);
644             assertEquals("canCommit", true, canCommit);
645             cohort1.preCommit().get(5, TimeUnit.SECONDS);
646             cohort1.commit().get(5, TimeUnit.SECONDS);
647         }};
648     }
649
650     @Test
651     public void testChangeListenerRegistration() throws Exception{
652         new IntegrationTestKit(getSystem()) {{
653             DistributedDataStore dataStore =
654                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
655
656             MockDataChangeListener listener = new MockDataChangeListener(3);
657
658             ListenerRegistration<MockDataChangeListener>
659                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
660                             DataChangeScope.SUBTREE);
661
662             assertNotNull("registerChangeListener returned null", listenerReg);
663
664             testWriteTransaction(dataStore, TestModel.TEST_PATH,
665                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
666
667             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
668                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
669
670             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
671                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
672             testWriteTransaction(dataStore, listPath,
673                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
674
675             listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
676
677             listenerReg.close();
678
679             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
680                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
681                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
682
683             listener.expectNoMoreChanges("Received unexpected change after close");
684
685             cleanup(dataStore);
686         }};
687     }
688
689     class IntegrationTestKit extends ShardTestKit {
690
691         IntegrationTestKit(ActorSystem actorSystem) {
692             super(actorSystem);
693         }
694
695         DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
696             return setupDistributedDataStore(typeName, true, shardNames);
697         }
698
699         DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
700                 String... shardNames) {
701             MockClusterWrapper cluster = new MockClusterWrapper();
702             Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
703             ShardStrategyFactory.setConfiguration(config);
704
705             DatastoreContext datastoreContext = datastoreContextBuilder.build();
706             DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
707                     config, datastoreContext);
708
709             SchemaContext schemaContext = SchemaContextHelper.full();
710             dataStore.onGlobalContextUpdated(schemaContext);
711
712             if(waitUntilLeader) {
713                 for(String shardName: shardNames) {
714                     ActorRef shard = null;
715                     for(int i = 0; i < 20 * 5 && shard == null; i++) {
716                         Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
717                         Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
718                         if(shardReply.isPresent()) {
719                             shard = shardReply.get();
720                         }
721                     }
722
723                     assertNotNull("Shard was not created", shard);
724
725                     waitUntilLeader(shard);
726                 }
727             }
728
729             return dataStore;
730         }
731
732         void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
733                 NormalizedNode<?, ?> nodeToWrite) throws Exception {
734
735             // 1. Create a write-only Tx
736
737             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
738             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
739
740             // 2. Write some data
741
742             writeTx.write(nodePath, nodeToWrite);
743
744             // 3. Ready the Tx for commit
745
746             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
747
748             // 4. Commit the Tx
749
750             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
751             assertEquals("canCommit", true, canCommit);
752             cohort.preCommit().get(5, TimeUnit.SECONDS);
753             cohort.commit().get(5, TimeUnit.SECONDS);
754
755             // 5. Verify the data in the store
756
757             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
758
759             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
760             assertEquals("isPresent", true, optional.isPresent());
761             assertEquals("Data node", nodeToWrite, optional.get());
762         }
763
764         void cleanup(DistributedDataStore dataStore) {
765             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
766         }
767     }
768
769 }