Fix raw references to Promise
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
6 import akka.actor.ActorRef;
7 import akka.actor.ActorSystem;
8 import akka.actor.PoisonPill;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.Uninterruptibles;
12 import java.util.concurrent.Callable;
13 import java.util.concurrent.CountDownLatch;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.atomic.AtomicReference;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
19 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
20 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
21 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
22 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
23 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
24 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
25 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
26 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
27 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
29 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
36 import org.opendaylight.yangtools.concepts.ListenerRegistration;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42
43 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
44
45     private final DatastoreContext.Builder datastoreContextBuilder =
46             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
47
48     @Test
49     public void testWriteTransactionWithSingleShard() throws Exception{
50         new IntegrationTestKit(getSystem()) {{
51             DistributedDataStore dataStore =
52                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
53
54             testWriteTransaction(dataStore, TestModel.TEST_PATH,
55                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
56
57             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
58                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
59
60             cleanup(dataStore);
61         }};
62     }
63
64     @Test
65     public void testWriteTransactionWithMultipleShards() throws Exception{
66         new IntegrationTestKit(getSystem()) {{
67             DistributedDataStore dataStore =
68                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
69
70             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
71             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
72
73             YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
74             NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
75             writeTx.write(nodePath1, nodeToWrite1);
76
77             YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
78             NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
79             writeTx.write(nodePath2, nodeToWrite2);
80
81             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
82
83             doCommit(cohort);
84
85             // Verify the data in the store
86
87             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
88
89             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
90             assertEquals("isPresent", true, optional.isPresent());
91             assertEquals("Data node", nodeToWrite1, optional.get());
92
93             optional = readTx.read(nodePath2).get();
94             assertEquals("isPresent", true, optional.isPresent());
95             assertEquals("Data node", nodeToWrite2, optional.get());
96
97             cleanup(dataStore);
98         }};
99     }
100
101     @Test
102     public void testReadWriteTransaction() throws Exception{
103         System.setProperty("shard.persistent", "true");
104         new IntegrationTestKit(getSystem()) {{
105             DistributedDataStore dataStore =
106                     setupDistributedDataStore("testReadWriteTransaction", "test-1");
107
108             // 1. Create a read-write Tx
109
110             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
111             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
112
113             // 2. Write some data
114
115             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
116             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
117             readWriteTx.write(nodePath, nodeToWrite );
118
119             // 3. Read the data from Tx
120
121             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
122             assertEquals("exists", true, exists);
123
124             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
125             assertEquals("isPresent", true, optional.isPresent());
126             assertEquals("Data node", nodeToWrite, optional.get());
127
128             // 4. Ready the Tx for commit
129
130             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
131
132             // 5. Commit the Tx
133
134             doCommit(cohort);
135
136             // 6. Verify the data in the store
137
138             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
139
140             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
141             assertEquals("isPresent", true, optional.isPresent());
142             assertEquals("Data node", nodeToWrite, optional.get());
143
144             cleanup(dataStore);
145         }};
146     }
147
148     @Test
149     public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
150         new IntegrationTestKit(getSystem()) {{
151             String testName = "testTransactionWritesWithShardNotInitiallyReady";
152             String shardName = "test-1";
153
154             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
155             // initialized until we create and submit the write the Tx.
156             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
157             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
158             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
159
160             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
161
162             // Create the write Tx
163
164             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
165             assertNotNull("newReadWriteTransaction returned null", writeTx);
166
167             // Do some modification operations and ready the Tx on a separate thread.
168
169             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
170                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
171                             TestModel.ID_QNAME, 1).build();
172
173             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
174             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
175             final CountDownLatch txReady = new CountDownLatch(1);
176             Thread txThread = new Thread() {
177                 @Override
178                 public void run() {
179                     try {
180                         writeTx.write(TestModel.TEST_PATH,
181                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
182
183                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
184                                 TestModel.OUTER_LIST_QNAME).build());
185
186                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
187                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
188
189                         writeTx.delete(listEntryPath);
190
191                         txCohort.set(writeTx.ready());
192                     } catch(Exception e) {
193                         caughtEx.set(e);
194                         return;
195                     } finally {
196                         txReady.countDown();
197                     }
198                 }
199             };
200
201             txThread.start();
202
203             // Wait for the Tx operations to complete.
204
205             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
206             if(caughtEx.get() != null) {
207                 throw caughtEx.get();
208             }
209
210             assertEquals("Tx ready", true, done);
211
212             // At this point the Tx operations should be waiting for the shard to initialize so
213             // trigger the latch to let the shard recovery to continue.
214
215             blockRecoveryLatch.countDown();
216
217             // Wait for the Tx commit to complete.
218
219             doCommit(txCohort.get());
220
221             // Verify the data in the store
222
223             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
224
225             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
226                     get(5, TimeUnit.SECONDS);
227             assertEquals("isPresent", true, optional.isPresent());
228
229             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
230             assertEquals("isPresent", true, optional.isPresent());
231
232             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
233             assertEquals("isPresent", false, optional.isPresent());
234
235             cleanup(dataStore);
236         }};
237     }
238
239     @Test
240     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
241         new IntegrationTestKit(getSystem()) {{
242             String testName = "testTransactionReadsWithShardNotInitiallyReady";
243             String shardName = "test-1";
244
245             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
246             // initialized until we create the Tx.
247             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
248             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
249             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
250
251             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
252
253             // Create the read-write Tx
254
255             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
256             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
257
258             // Do some reads on the Tx on a separate thread.
259
260             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
261                     new AtomicReference<>();
262             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
263                     txReadFuture = new AtomicReference<>();
264             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
265             final CountDownLatch txReadsDone = new CountDownLatch(1);
266             Thread txThread = new Thread() {
267                 @Override
268                 public void run() {
269                     try {
270                         readWriteTx.write(TestModel.TEST_PATH,
271                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
272
273                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
274
275                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
276                     } catch(Exception e) {
277                         caughtEx.set(e);
278                         return;
279                     } finally {
280                         txReadsDone.countDown();
281                     }
282                 }
283             };
284
285             txThread.start();
286
287             // Wait for the Tx operations to complete.
288
289             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
290             if(caughtEx.get() != null) {
291                 throw caughtEx.get();
292             }
293
294             assertEquals("Tx reads done", true, done);
295
296             // At this point the Tx operations should be waiting for the shard to initialize so
297             // trigger the latch to let the shard recovery to continue.
298
299             blockRecoveryLatch.countDown();
300
301             // Wait for the reads to complete and verify.
302
303             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
304             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
305
306             readWriteTx.close();
307
308             cleanup(dataStore);
309         }};
310     }
311
312     @Test(expected=NotInitializedException.class)
313     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
314         new IntegrationTestKit(getSystem()) {{
315             String testName = "testTransactionCommitFailureWithShardNotInitialized";
316             String shardName = "test-1";
317
318             // Set the shard initialization timeout low for the test.
319
320             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
321
322             // Setup the InMemoryJournal to block shard recovery indefinitely.
323
324             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
325             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
326             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
327
328             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
329
330             // Create the write Tx
331
332             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
333             assertNotNull("newReadWriteTransaction returned null", writeTx);
334
335             // Do some modifications and ready the Tx on a separate thread.
336
337             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
338             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
339             final CountDownLatch txReady = new CountDownLatch(1);
340             Thread txThread = new Thread() {
341                 @Override
342                 public void run() {
343                     try {
344                         writeTx.write(TestModel.TEST_PATH,
345                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
346
347                         txCohort.set(writeTx.ready());
348                     } catch(Exception e) {
349                         caughtEx.set(e);
350                         return;
351                     } finally {
352                         txReady.countDown();
353                     }
354                 }
355             };
356
357             txThread.start();
358
359             // Wait for the Tx operations to complete.
360
361             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
362             if(caughtEx.get() != null) {
363                 throw caughtEx.get();
364             }
365
366             assertEquals("Tx ready", true, done);
367
368             // Wait for the commit to complete. Since the shard never initialized, the Tx should
369             // have timed out and throw an appropriate exception cause.
370
371             try {
372                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
373             } catch(ExecutionException e) {
374                 throw e.getCause();
375             } finally {
376                 blockRecoveryLatch.countDown();
377                 cleanup(dataStore);
378             }
379         }};
380     }
381
382     @Test(expected=NotInitializedException.class)
383     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
384         new IntegrationTestKit(getSystem()) {{
385             String testName = "testTransactionReadFailureWithShardNotInitialized";
386             String shardName = "test-1";
387
388             // Set the shard initialization timeout low for the test.
389
390             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
391
392             // Setup the InMemoryJournal to block shard recovery indefinitely.
393
394             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
395             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
396             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
397
398             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
399
400             // Create the read-write Tx
401
402             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
403             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
404
405             // Do a read on the Tx on a separate thread.
406
407             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
408                     txReadFuture = new AtomicReference<>();
409             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
410             final CountDownLatch txReadDone = new CountDownLatch(1);
411             Thread txThread = new Thread() {
412                 @Override
413                 public void run() {
414                     try {
415                         readWriteTx.write(TestModel.TEST_PATH,
416                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
417
418                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
419
420                         readWriteTx.close();
421                     } catch(Exception e) {
422                         caughtEx.set(e);
423                         return;
424                     } finally {
425                         txReadDone.countDown();
426                     }
427                 }
428             };
429
430             txThread.start();
431
432             // Wait for the Tx operations to complete.
433
434             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
435             if(caughtEx.get() != null) {
436                 throw caughtEx.get();
437             }
438
439             assertEquals("Tx read done", true, done);
440
441             // Wait for the read to complete. Since the shard never initialized, the Tx should
442             // have timed out and throw an appropriate exception cause.
443
444             try {
445                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
446             } catch(ReadFailedException e) {
447                 throw e.getCause();
448             } finally {
449                 blockRecoveryLatch.countDown();
450                 cleanup(dataStore);
451             }
452         }};
453     }
454
455     @Test(expected=NoShardLeaderException.class)
456     public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
457         new IntegrationTestKit(getSystem()) {{
458             String testName = "testTransactionCommitFailureWithNoShardLeader";
459             String shardName = "test-1";
460
461             // We don't want the shard to become the leader so prevent shard election from completing
462             // by setting the election timeout, which is based on the heartbeat interval, really high.
463
464             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
465
466             // Set the leader election timeout low for the test.
467
468             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
469
470             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
471
472             // Create the write Tx.
473
474             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
475             assertNotNull("newReadWriteTransaction returned null", writeTx);
476
477             // Do some modifications and ready the Tx on a separate thread.
478
479             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
480             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
481             final CountDownLatch txReady = new CountDownLatch(1);
482             Thread txThread = new Thread() {
483                 @Override
484                 public void run() {
485                     try {
486                         writeTx.write(TestModel.TEST_PATH,
487                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
488
489                         txCohort.set(writeTx.ready());
490                     } catch(Exception e) {
491                         caughtEx.set(e);
492                         return;
493                     } finally {
494                         txReady.countDown();
495                     }
496                 }
497             };
498
499             txThread.start();
500
501             // Wait for the Tx operations to complete.
502
503             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
504             if(caughtEx.get() != null) {
505                 throw caughtEx.get();
506             }
507
508             assertEquals("Tx ready", true, done);
509
510             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
511             // should have timed out and throw an appropriate exception cause.
512
513             try {
514                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
515             } catch(ExecutionException e) {
516                 throw e.getCause();
517             } finally {
518                 cleanup(dataStore);
519             }
520         }};
521     }
522
523     @Test
524     public void testTransactionAbort() throws Exception{
525         System.setProperty("shard.persistent", "true");
526         new IntegrationTestKit(getSystem()) {{
527             DistributedDataStore dataStore =
528                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
529
530             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
531             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
532
533             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
534
535             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
536
537             cohort.canCommit().get(5, TimeUnit.SECONDS);
538
539             cohort.abort().get(5, TimeUnit.SECONDS);
540
541             testWriteTransaction(dataStore, TestModel.TEST_PATH,
542                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
543
544             cleanup(dataStore);
545         }};
546     }
547
548     @Test
549     public void testTransactionChain() throws Exception{
550         new IntegrationTestKit(getSystem()) {{
551             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
552
553             // 1. Create a Tx chain and write-only Tx
554
555             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
556
557             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
558             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
559
560             // 2. Write some data
561
562             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
563             writeTx.write(TestModel.TEST_PATH, testNode);
564
565             // 3. Ready the Tx for commit
566
567             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
568
569             // 4. Commit the Tx on another thread that first waits for the second read Tx.
570
571             final CountDownLatch continueCommit1 = new CountDownLatch(1);
572             final CountDownLatch commit1Done = new CountDownLatch(1);
573             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
574             new Thread() {
575                 @Override
576                 public void run() {
577                     try {
578                         continueCommit1.await();
579                         doCommit(cohort1);
580                     } catch (Exception e) {
581                         commit1Error.set(e);
582                     } finally {
583                         commit1Done.countDown();
584                     }
585                 }
586             }.start();
587
588             // 5. Create a new read Tx from the chain to read and verify the data from the first
589             // Tx is visible after being readied.
590
591             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
592             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
593             assertEquals("isPresent", true, optional.isPresent());
594             assertEquals("Data node", testNode, optional.get());
595
596             // 6. Create a new RW Tx from the chain, write more data, and ready it
597
598             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
599             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
600             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
601
602             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
603
604             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
605             // verify it is visible.
606
607             readTx = txChain.newReadWriteTransaction();
608             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
609             assertEquals("isPresent", true, optional.isPresent());
610             assertEquals("Data node", outerNode, optional.get());
611
612             // 8. Wait for the 2 commits to complete and close the chain.
613
614             continueCommit1.countDown();
615             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
616
617             if(commit1Error.get() != null) {
618                 throw commit1Error.get();
619             }
620
621             doCommit(cohort2);
622
623             txChain.close();
624
625             // 9. Create a new read Tx from the data store and verify committed data.
626
627             readTx = dataStore.newReadOnlyTransaction();
628             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
629             assertEquals("isPresent", true, optional.isPresent());
630             assertEquals("Data node", outerNode, optional.get());
631
632             cleanup(dataStore);
633         }};
634     }
635
636     @Test
637     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
638         new IntegrationTestKit(getSystem()) {{
639             DistributedDataStore dataStore = setupDistributedDataStore(
640                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
641
642             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
643
644             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
645             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
646
647             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
648
649             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
650             // readied.
651
652             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
653         }};
654     }
655
656     @Test
657     public void testCreateChainedTransactionAfterClose() throws Throwable {
658         new IntegrationTestKit(getSystem()) {{
659             DistributedDataStore dataStore = setupDistributedDataStore(
660                     "testCreateChainedTransactionAfterClose", "test-1");
661
662             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
663
664             txChain.close();
665
666             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
667
668             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
669         }};
670     }
671
672     @Test
673     public void testChangeListenerRegistration() throws Exception{
674         new IntegrationTestKit(getSystem()) {{
675             DistributedDataStore dataStore =
676                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
677
678             testWriteTransaction(dataStore, TestModel.TEST_PATH,
679                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
680
681             MockDataChangeListener listener = new MockDataChangeListener(1);
682
683             ListenerRegistration<MockDataChangeListener>
684                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
685                             DataChangeScope.SUBTREE);
686
687             assertNotNull("registerChangeListener returned null", listenerReg);
688
689             // Wait for the initial notification
690
691             listener.waitForChangeEvents(TestModel.TEST_PATH);
692
693             listener.reset(2);
694
695             // Write 2 updates.
696
697             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
698                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
699
700             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
701                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
702             testWriteTransaction(dataStore, listPath,
703                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
704
705             // Wait for the 2 updates.
706
707             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
708
709             listenerReg.close();
710
711             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
712                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
713                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
714
715             listener.expectNoMoreChanges("Received unexpected change after close");
716
717             cleanup(dataStore);
718         }};
719     }
720
721     class IntegrationTestKit extends ShardTestKit {
722
723         IntegrationTestKit(ActorSystem actorSystem) {
724             super(actorSystem);
725         }
726
727         DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
728             return setupDistributedDataStore(typeName, true, shardNames);
729         }
730
731         DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
732                 String... shardNames) {
733             MockClusterWrapper cluster = new MockClusterWrapper();
734             Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
735             ShardStrategyFactory.setConfiguration(config);
736
737             DatastoreContext datastoreContext = datastoreContextBuilder.build();
738             DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster,
739                     config, datastoreContext);
740
741             SchemaContext schemaContext = SchemaContextHelper.full();
742             dataStore.onGlobalContextUpdated(schemaContext);
743
744             if(waitUntilLeader) {
745                 for(String shardName: shardNames) {
746                     ActorRef shard = null;
747                     for(int i = 0; i < 20 * 5 && shard == null; i++) {
748                         Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
749                         Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
750                         if(shardReply.isPresent()) {
751                             shard = shardReply.get();
752                         }
753                     }
754
755                     assertNotNull("Shard was not created", shard);
756
757                     waitUntilLeader(shard);
758                 }
759             }
760
761             return dataStore;
762         }
763
764         void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
765                 NormalizedNode<?, ?> nodeToWrite) throws Exception {
766
767             // 1. Create a write-only Tx
768
769             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
770             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
771
772             // 2. Write some data
773
774             writeTx.write(nodePath, nodeToWrite);
775
776             // 3. Ready the Tx for commit
777
778             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
779
780             // 4. Commit the Tx
781
782             doCommit(cohort);
783
784             // 5. Verify the data in the store
785
786             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
787
788             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
789             assertEquals("isPresent", true, optional.isPresent());
790             assertEquals("Data node", nodeToWrite, optional.get());
791         }
792
793         void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
794             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
795             assertEquals("canCommit", true, canCommit);
796             cohort.preCommit().get(5, TimeUnit.SECONDS);
797             cohort.commit().get(5, TimeUnit.SECONDS);
798         }
799
800         void cleanup(DistributedDataStore dataStore) {
801             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
802         }
803
804         void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
805                 throws Exception {
806             try {
807                 callable.call();
808                 fail("Expected " + expType.getSimpleName());
809             } catch(Exception e) {
810                 assertEquals("Exception type", expType, e.getClass());
811             }
812         }
813
814         void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
815                 Class<? extends Exception> expType) throws Exception {
816             assertExceptionOnCall(new Callable<Void>() {
817                 @Override
818                 public Void call() throws Exception {
819                     txChain.newWriteOnlyTransaction();
820                     return null;
821                 }
822             }, expType);
823
824             assertExceptionOnCall(new Callable<Void>() {
825                 @Override
826                 public Void call() throws Exception {
827                     txChain.newReadWriteTransaction();
828                     return null;
829                 }
830             }, expType);
831
832             assertExceptionOnCall(new Callable<Void>() {
833                 @Override
834                 public Void call() throws Exception {
835                     txChain.newReadOnlyTransaction();
836                     return null;
837                 }
838             }, expType);
839         }
840     }
841
842 }