BUG 2134 : Make persistence configurable at the datastore level
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.PoisonPill;
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.CheckedFuture;
8 import com.google.common.util.concurrent.Uninterruptibles;
9 import org.junit.Test;
10 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
11 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
12 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
13 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
14 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
15 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
16 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
17 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
18 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
19 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
21 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
27 import org.opendaylight.yangtools.concepts.ListenerRegistration;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
30 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import static org.junit.Assert.assertEquals;
38 import static org.junit.Assert.assertNotNull;
39
40 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
41
42     private final DatastoreContext.Builder datastoreContextBuilder =
43             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
44
45     @Test
46     public void testWriteTransactionWithSingleShard() throws Exception{
47         new IntegrationTestKit(getSystem()) {{
48             DistributedDataStore dataStore =
49                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
50
51             testWriteTransaction(dataStore, TestModel.TEST_PATH,
52                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
53
54             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
55                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
56
57             cleanup(dataStore);
58         }};
59     }
60
61     @Test
62     public void testWriteTransactionWithMultipleShards() throws Exception{
63         new IntegrationTestKit(getSystem()) {{
64             DistributedDataStore dataStore =
65                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
66
67             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
68             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
69
70             YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
71             NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
72             writeTx.write(nodePath1, nodeToWrite1);
73
74             YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
75             NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
76             writeTx.write(nodePath2, nodeToWrite2);
77
78             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
79
80             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
81             assertEquals("canCommit", true, canCommit);
82             cohort.preCommit().get(5, TimeUnit.SECONDS);
83             cohort.commit().get(5, TimeUnit.SECONDS);
84
85             // Verify the data in the store
86
87             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
88
89             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
90             assertEquals("isPresent", true, optional.isPresent());
91             assertEquals("Data node", nodeToWrite1, optional.get());
92
93             optional = readTx.read(nodePath2).get();
94             assertEquals("isPresent", true, optional.isPresent());
95             assertEquals("Data node", nodeToWrite2, optional.get());
96
97             cleanup(dataStore);
98         }};
99     }
100
101     @Test
102     public void testReadWriteTransaction() throws Exception{
103         System.setProperty("shard.persistent", "true");
104         new IntegrationTestKit(getSystem()) {{
105             DistributedDataStore dataStore =
106                     setupDistributedDataStore("testReadWriteTransaction", "test-1");
107
108             // 1. Create a read-write Tx
109
110             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
111             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
112
113             // 2. Write some data
114
115             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
116             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
117             readWriteTx.write(nodePath, nodeToWrite );
118
119             // 3. Read the data from Tx
120
121             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
122             assertEquals("exists", true, exists);
123
124             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
125             assertEquals("isPresent", true, optional.isPresent());
126             assertEquals("Data node", nodeToWrite, optional.get());
127
128             // 4. Ready the Tx for commit
129
130             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
131
132             // 5. Commit the Tx
133
134             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
135             assertEquals("canCommit", true, canCommit);
136             cohort.preCommit().get(5, TimeUnit.SECONDS);
137             cohort.commit().get(5, TimeUnit.SECONDS);
138
139             // 6. Verify the data in the store
140
141             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
142
143             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
144             assertEquals("isPresent", true, optional.isPresent());
145             assertEquals("Data node", nodeToWrite, optional.get());
146
147             cleanup(dataStore);
148         }};
149     }
150
151     @Test
152     public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
153         new IntegrationTestKit(getSystem()) {{
154             String testName = "testTransactionWritesWithShardNotInitiallyReady";
155             String shardName = "test-1";
156
157             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
158             // initialized until we create and submit the write the Tx.
159             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
160             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
161             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
162
163             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
164
165             // Create the write Tx
166
167             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
168             assertNotNull("newReadWriteTransaction returned null", writeTx);
169
170             // Do some modification operations and ready the Tx on a separate thread.
171
172             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
173                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
174                             TestModel.ID_QNAME, 1).build();
175
176             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
177             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
178             final CountDownLatch txReady = new CountDownLatch(1);
179             Thread txThread = new Thread() {
180                 @Override
181                 public void run() {
182                     try {
183                         writeTx.write(TestModel.TEST_PATH,
184                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
185
186                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
187                                 TestModel.OUTER_LIST_QNAME).build());
188
189                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
190                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
191
192                         writeTx.delete(listEntryPath);
193
194                         txCohort.set(writeTx.ready());
195                     } catch(Exception e) {
196                         caughtEx.set(e);
197                         return;
198                     } finally {
199                         txReady.countDown();
200                     }
201                 }
202             };
203
204             txThread.start();
205
206             // Wait for the Tx operations to complete.
207
208             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
209             if(caughtEx.get() != null) {
210                 throw caughtEx.get();
211             }
212
213             assertEquals("Tx ready", true, done);
214
215             // At this point the Tx operations should be waiting for the shard to initialize so
216             // trigger the latch to let the shard recovery to continue.
217
218             blockRecoveryLatch.countDown();
219
220             // Wait for the Tx commit to complete.
221
222             assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
223             txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
224             txCohort.get().commit().get(5, TimeUnit.SECONDS);
225
226             // Verify the data in the store
227
228             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
229
230             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
231                     get(5, TimeUnit.SECONDS);
232             assertEquals("isPresent", true, optional.isPresent());
233
234             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
235             assertEquals("isPresent", true, optional.isPresent());
236
237             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
238             assertEquals("isPresent", false, optional.isPresent());
239
240             cleanup(dataStore);
241         }};
242     }
243
244     @Test
245     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
246         new IntegrationTestKit(getSystem()) {{
247             String testName = "testTransactionReadsWithShardNotInitiallyReady";
248             String shardName = "test-1";
249
250             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
251             // initialized until we create the Tx.
252             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
253             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
254             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
255
256             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
257
258             // Create the read-write Tx
259
260             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
261             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
262
263             // Do some reads on the Tx on a separate thread.
264
265             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
266                     new AtomicReference<>();
267             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
268                     txReadFuture = new AtomicReference<>();
269             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
270             final CountDownLatch txReadsDone = new CountDownLatch(1);
271             Thread txThread = new Thread() {
272                 @Override
273                 public void run() {
274                     try {
275                         readWriteTx.write(TestModel.TEST_PATH,
276                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
277
278                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
279
280                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
281                     } catch(Exception e) {
282                         caughtEx.set(e);
283                         return;
284                     } finally {
285                         txReadsDone.countDown();
286                     }
287                 }
288             };
289
290             txThread.start();
291
292             // Wait for the Tx operations to complete.
293
294             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
295             if(caughtEx.get() != null) {
296                 throw caughtEx.get();
297             }
298
299             assertEquals("Tx reads done", true, done);
300
301             // At this point the Tx operations should be waiting for the shard to initialize so
302             // trigger the latch to let the shard recovery to continue.
303
304             blockRecoveryLatch.countDown();
305
306             // Wait for the reads to complete and verify.
307
308             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
309             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
310
311             readWriteTx.close();
312
313             cleanup(dataStore);
314         }};
315     }
316
317     @Test(expected=NotInitializedException.class)
318     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
319         new IntegrationTestKit(getSystem()) {{
320             String testName = "testTransactionCommitFailureWithShardNotInitialized";
321             String shardName = "test-1";
322
323             // Set the shard initialization timeout low for the test.
324
325             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
326
327             // Setup the InMemoryJournal to block shard recovery indefinitely.
328
329             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
330             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
331             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
332
333             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
334
335             // Create the write Tx
336
337             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
338             assertNotNull("newReadWriteTransaction returned null", writeTx);
339
340             // Do some modifications and ready the Tx on a separate thread.
341
342             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
343             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
344             final CountDownLatch txReady = new CountDownLatch(1);
345             Thread txThread = new Thread() {
346                 @Override
347                 public void run() {
348                     try {
349                         writeTx.write(TestModel.TEST_PATH,
350                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
351
352                         txCohort.set(writeTx.ready());
353                     } catch(Exception e) {
354                         caughtEx.set(e);
355                         return;
356                     } finally {
357                         txReady.countDown();
358                     }
359                 }
360             };
361
362             txThread.start();
363
364             // Wait for the Tx operations to complete.
365
366             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
367             if(caughtEx.get() != null) {
368                 throw caughtEx.get();
369             }
370
371             assertEquals("Tx ready", true, done);
372
373             // Wait for the commit to complete. Since the shard never initialized, the Tx should
374             // have timed out and throw an appropriate exception cause.
375
376             try {
377                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
378             } catch(ExecutionException e) {
379                 throw e.getCause();
380             } finally {
381                 blockRecoveryLatch.countDown();
382                 cleanup(dataStore);
383             }
384         }};
385     }
386
387     @Test(expected=NotInitializedException.class)
388     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
389         new IntegrationTestKit(getSystem()) {{
390             String testName = "testTransactionReadFailureWithShardNotInitialized";
391             String shardName = "test-1";
392
393             // Set the shard initialization timeout low for the test.
394
395             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
396
397             // Setup the InMemoryJournal to block shard recovery indefinitely.
398
399             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
400             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
401             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
402
403             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
404
405             // Create the read-write Tx
406
407             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
408             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
409
410             // Do a read on the Tx on a separate thread.
411
412             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
413                     txReadFuture = new AtomicReference<>();
414             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
415             final CountDownLatch txReadDone = new CountDownLatch(1);
416             Thread txThread = new Thread() {
417                 @Override
418                 public void run() {
419                     try {
420                         readWriteTx.write(TestModel.TEST_PATH,
421                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
422
423                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
424
425                         readWriteTx.close();
426                     } catch(Exception e) {
427                         caughtEx.set(e);
428                         return;
429                     } finally {
430                         txReadDone.countDown();
431                     }
432                 }
433             };
434
435             txThread.start();
436
437             // Wait for the Tx operations to complete.
438
439             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
440             if(caughtEx.get() != null) {
441                 throw caughtEx.get();
442             }
443
444             assertEquals("Tx read done", true, done);
445
446             // Wait for the read to complete. Since the shard never initialized, the Tx should
447             // have timed out and throw an appropriate exception cause.
448
449             try {
450                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
451             } catch(ReadFailedException e) {
452                 throw e.getCause();
453             } finally {
454                 blockRecoveryLatch.countDown();
455                 cleanup(dataStore);
456             }
457         }};
458     }
459
460     @Test(expected=NoShardLeaderException.class)
461     public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
462         new IntegrationTestKit(getSystem()) {{
463             String testName = "testTransactionCommitFailureWithNoShardLeader";
464             String shardName = "test-1";
465
466             // We don't want the shard to become the leader so prevent shard election from completing
467             // by setting the election timeout, which is based on the heartbeat interval, really high.
468
469             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
470
471             // Set the leader election timeout low for the test.
472
473             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
474
475             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
476
477             // Create the write Tx.
478
479             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
480             assertNotNull("newReadWriteTransaction returned null", writeTx);
481
482             // Do some modifications and ready the Tx on a separate thread.
483
484             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
485             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
486             final CountDownLatch txReady = new CountDownLatch(1);
487             Thread txThread = new Thread() {
488                 @Override
489                 public void run() {
490                     try {
491                         writeTx.write(TestModel.TEST_PATH,
492                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
493
494                         txCohort.set(writeTx.ready());
495                     } catch(Exception e) {
496                         caughtEx.set(e);
497                         return;
498                     } finally {
499                         txReady.countDown();
500                     }
501                 }
502             };
503
504             txThread.start();
505
506             // Wait for the Tx operations to complete.
507
508             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
509             if(caughtEx.get() != null) {
510                 throw caughtEx.get();
511             }
512
513             assertEquals("Tx ready", true, done);
514
515             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
516             // should have timed out and throw an appropriate exception cause.
517
518             try {
519                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
520             } catch(ExecutionException e) {
521                 throw e.getCause();
522             } finally {
523                 cleanup(dataStore);
524             }
525         }};
526     }
527
528     @Test
529     public void testTransactionAbort() throws Exception{
530         System.setProperty("shard.persistent", "true");
531         new IntegrationTestKit(getSystem()) {{
532             DistributedDataStore dataStore =
533                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
534
535             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
536             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
537
538             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
539
540             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
541
542             cohort.canCommit().get(5, TimeUnit.SECONDS);
543
544             cohort.abort().get(5, TimeUnit.SECONDS);
545
546             testWriteTransaction(dataStore, TestModel.TEST_PATH,
547                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
548
549             cleanup(dataStore);
550         }};
551     }
552
553     @Test
554     public void testTransactionChain() throws Exception{
555         System.setProperty("shard.persistent", "true");
556         new IntegrationTestKit(getSystem()) {{
557             DistributedDataStore dataStore =
558                     setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
559
560             // 1. Create a Tx chain and write-only Tx
561
562             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
563
564             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
565             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
566
567             // 2. Write some data
568
569             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
570             writeTx.write(TestModel.TEST_PATH, containerNode);
571
572             // 3. Ready the Tx for commit
573
574             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
575
576             // 4. Commit the Tx
577
578             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
579             assertEquals("canCommit", true, canCommit);
580             cohort.preCommit().get(5, TimeUnit.SECONDS);
581             cohort.commit().get(5, TimeUnit.SECONDS);
582
583             // 5. Verify the data in the store
584
585             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
586
587             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
588             assertEquals("isPresent", true, optional.isPresent());
589             assertEquals("Data node", containerNode, optional.get());
590
591             txChain.close();
592
593             cleanup(dataStore);
594         }};
595     }
596
597     @Test
598     public void testChangeListenerRegistration() throws Exception{
599         new IntegrationTestKit(getSystem()) {{
600             DistributedDataStore dataStore =
601                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
602
603             MockDataChangeListener listener = new MockDataChangeListener(3);
604
605             ListenerRegistration<MockDataChangeListener>
606                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
607                             DataChangeScope.SUBTREE);
608
609             assertNotNull("registerChangeListener returned null", listenerReg);
610
611             testWriteTransaction(dataStore, TestModel.TEST_PATH,
612                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
613
614             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
615                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
616
617             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
618                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
619             testWriteTransaction(dataStore, listPath,
620                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
621
622             listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
623
624             listenerReg.close();
625
626             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
627                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
628                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
629
630             listener.expectNoMoreChanges("Received unexpected change after close");
631
632             cleanup(dataStore);
633         }};
634     }
635
636     class IntegrationTestKit extends ShardTestKit {
637
638         IntegrationTestKit(ActorSystem actorSystem) {
639             super(actorSystem);
640         }
641
642         DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
643             return setupDistributedDataStore(typeName, true, shardNames);
644         }
645
646         DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
647                 String... shardNames) {
648             MockClusterWrapper cluster = new MockClusterWrapper();
649             Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
650             ShardStrategyFactory.setConfiguration(config);
651
652             DatastoreContext datastoreContext = datastoreContextBuilder.build();
653             DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
654                     config, datastoreContext);
655
656             SchemaContext schemaContext = SchemaContextHelper.full();
657             dataStore.onGlobalContextUpdated(schemaContext);
658
659             if(waitUntilLeader) {
660                 for(String shardName: shardNames) {
661                     ActorRef shard = null;
662                     for(int i = 0; i < 20 * 5 && shard == null; i++) {
663                         Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
664                         Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
665                         if(shardReply.isPresent()) {
666                             shard = shardReply.get();
667                         }
668                     }
669
670                     assertNotNull("Shard was not created", shard);
671
672                     waitUntilLeader(shard);
673                 }
674             }
675
676             return dataStore;
677         }
678
679         void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
680                 NormalizedNode<?, ?> nodeToWrite) throws Exception {
681
682             // 1. Create a write-only Tx
683
684             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
685             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
686
687             // 2. Write some data
688
689             writeTx.write(nodePath, nodeToWrite);
690
691             // 3. Ready the Tx for commit
692
693             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
694
695             // 4. Commit the Tx
696
697             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
698             assertEquals("canCommit", true, canCommit);
699             cohort.preCommit().get(5, TimeUnit.SECONDS);
700             cohort.commit().get(5, TimeUnit.SECONDS);
701
702             // 5. Verify the data in the store
703
704             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
705
706             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
707             assertEquals("isPresent", true, optional.isPresent());
708             assertEquals("Data node", nodeToWrite, optional.get());
709         }
710
711         void cleanup(DistributedDataStore dataStore) {
712             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
713         }
714     }
715
716 }