Merge "Startup archetype: remove 'Impl' from config subsystem Module name."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
6 import akka.actor.ActorRef;
7 import akka.actor.ActorSystem;
8 import akka.actor.PoisonPill;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.Uninterruptibles;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CountDownLatch;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicReference;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
22 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
23 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
24 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
25 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
26 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
27 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
28 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44
45 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
46
47     private final DatastoreContext.Builder datastoreContextBuilder =
48             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
49
50     @Test
51     public void testWriteTransactionWithSingleShard() throws Exception{
52         new IntegrationTestKit(getSystem()) {{
53             DistributedDataStore dataStore =
54                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
55
56             testWriteTransaction(dataStore, TestModel.TEST_PATH,
57                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
58
59             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
60                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
61
62             cleanup(dataStore);
63         }};
64     }
65
66     @Test
67     public void testWriteTransactionWithMultipleShards() throws Exception{
68         new IntegrationTestKit(getSystem()) {{
69             DistributedDataStore dataStore =
70                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
71
72             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
73             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
74
75             YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
76             NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
77             writeTx.write(nodePath1, nodeToWrite1);
78
79             YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
80             NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
81             writeTx.write(nodePath2, nodeToWrite2);
82
83             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
84
85             doCommit(cohort);
86
87             // Verify the data in the store
88
89             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
90
91             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
92             assertEquals("isPresent", true, optional.isPresent());
93             assertEquals("Data node", nodeToWrite1, optional.get());
94
95             optional = readTx.read(nodePath2).get();
96             assertEquals("isPresent", true, optional.isPresent());
97             assertEquals("Data node", nodeToWrite2, optional.get());
98
99             cleanup(dataStore);
100         }};
101     }
102
103     @Test
104     public void testReadWriteTransaction() throws Exception{
105         System.setProperty("shard.persistent", "true");
106         new IntegrationTestKit(getSystem()) {{
107             DistributedDataStore dataStore =
108                     setupDistributedDataStore("testReadWriteTransaction", "test-1");
109
110             // 1. Create a read-write Tx
111
112             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
113             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
114
115             // 2. Write some data
116
117             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
118             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
119             readWriteTx.write(nodePath, nodeToWrite );
120
121             // 3. Read the data from Tx
122
123             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
124             assertEquals("exists", true, exists);
125
126             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
127             assertEquals("isPresent", true, optional.isPresent());
128             assertEquals("Data node", nodeToWrite, optional.get());
129
130             // 4. Ready the Tx for commit
131
132             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
133
134             // 5. Commit the Tx
135
136             doCommit(cohort);
137
138             // 6. Verify the data in the store
139
140             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
141
142             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
143             assertEquals("isPresent", true, optional.isPresent());
144             assertEquals("Data node", nodeToWrite, optional.get());
145
146             cleanup(dataStore);
147         }};
148     }
149
150     @Test
151     public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
152         new IntegrationTestKit(getSystem()) {{
153             String testName = "testTransactionWritesWithShardNotInitiallyReady";
154             String shardName = "test-1";
155
156             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
157             // initialized until we create and submit the write the Tx.
158             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
159             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
160             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
161
162             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
163
164             // Create the write Tx
165
166             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
167             assertNotNull("newReadWriteTransaction returned null", writeTx);
168
169             // Do some modification operations and ready the Tx on a separate thread.
170
171             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
172                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
173                             TestModel.ID_QNAME, 1).build();
174
175             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
176             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
177             final CountDownLatch txReady = new CountDownLatch(1);
178             Thread txThread = new Thread() {
179                 @Override
180                 public void run() {
181                     try {
182                         writeTx.write(TestModel.TEST_PATH,
183                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
184
185                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
186                                 TestModel.OUTER_LIST_QNAME).build());
187
188                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
189                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
190
191                         writeTx.delete(listEntryPath);
192
193                         txCohort.set(writeTx.ready());
194                     } catch(Exception e) {
195                         caughtEx.set(e);
196                         return;
197                     } finally {
198                         txReady.countDown();
199                     }
200                 }
201             };
202
203             txThread.start();
204
205             // Wait for the Tx operations to complete.
206
207             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
208             if(caughtEx.get() != null) {
209                 throw caughtEx.get();
210             }
211
212             assertEquals("Tx ready", true, done);
213
214             // At this point the Tx operations should be waiting for the shard to initialize so
215             // trigger the latch to let the shard recovery to continue.
216
217             blockRecoveryLatch.countDown();
218
219             // Wait for the Tx commit to complete.
220
221             doCommit(txCohort.get());
222
223             // Verify the data in the store
224
225             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
226
227             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
228                     get(5, TimeUnit.SECONDS);
229             assertEquals("isPresent", true, optional.isPresent());
230
231             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
232             assertEquals("isPresent", true, optional.isPresent());
233
234             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
235             assertEquals("isPresent", false, optional.isPresent());
236
237             cleanup(dataStore);
238         }};
239     }
240
241     @Test
242     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
243         new IntegrationTestKit(getSystem()) {{
244             String testName = "testTransactionReadsWithShardNotInitiallyReady";
245             String shardName = "test-1";
246
247             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
248             // initialized until we create the Tx.
249             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
250             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
251             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
252
253             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
254
255             // Create the read-write Tx
256
257             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
258             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
259
260             // Do some reads on the Tx on a separate thread.
261
262             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
263                     new AtomicReference<>();
264             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
265                     txReadFuture = new AtomicReference<>();
266             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
267             final CountDownLatch txReadsDone = new CountDownLatch(1);
268             Thread txThread = new Thread() {
269                 @Override
270                 public void run() {
271                     try {
272                         readWriteTx.write(TestModel.TEST_PATH,
273                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
274
275                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
276
277                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
278                     } catch(Exception e) {
279                         caughtEx.set(e);
280                         return;
281                     } finally {
282                         txReadsDone.countDown();
283                     }
284                 }
285             };
286
287             txThread.start();
288
289             // Wait for the Tx operations to complete.
290
291             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
292             if(caughtEx.get() != null) {
293                 throw caughtEx.get();
294             }
295
296             assertEquals("Tx reads done", true, done);
297
298             // At this point the Tx operations should be waiting for the shard to initialize so
299             // trigger the latch to let the shard recovery to continue.
300
301             blockRecoveryLatch.countDown();
302
303             // Wait for the reads to complete and verify.
304
305             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
306             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
307
308             readWriteTx.close();
309
310             cleanup(dataStore);
311         }};
312     }
313
314     @Test(expected=NotInitializedException.class)
315     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
316         new IntegrationTestKit(getSystem()) {{
317             String testName = "testTransactionCommitFailureWithShardNotInitialized";
318             String shardName = "test-1";
319
320             // Set the shard initialization timeout low for the test.
321
322             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
323
324             // Setup the InMemoryJournal to block shard recovery indefinitely.
325
326             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
327             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
328             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
329
330             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
331
332             // Create the write Tx
333
334             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
335             assertNotNull("newReadWriteTransaction returned null", writeTx);
336
337             // Do some modifications and ready the Tx on a separate thread.
338
339             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
340             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
341             final CountDownLatch txReady = new CountDownLatch(1);
342             Thread txThread = new Thread() {
343                 @Override
344                 public void run() {
345                     try {
346                         writeTx.write(TestModel.TEST_PATH,
347                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
348
349                         txCohort.set(writeTx.ready());
350                     } catch(Exception e) {
351                         caughtEx.set(e);
352                         return;
353                     } finally {
354                         txReady.countDown();
355                     }
356                 }
357             };
358
359             txThread.start();
360
361             // Wait for the Tx operations to complete.
362
363             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
364             if(caughtEx.get() != null) {
365                 throw caughtEx.get();
366             }
367
368             assertEquals("Tx ready", true, done);
369
370             // Wait for the commit to complete. Since the shard never initialized, the Tx should
371             // have timed out and throw an appropriate exception cause.
372
373             try {
374                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
375             } catch(ExecutionException e) {
376                 throw e.getCause();
377             } finally {
378                 blockRecoveryLatch.countDown();
379                 cleanup(dataStore);
380             }
381         }};
382     }
383
384     @Test(expected=NotInitializedException.class)
385     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
386         new IntegrationTestKit(getSystem()) {{
387             String testName = "testTransactionReadFailureWithShardNotInitialized";
388             String shardName = "test-1";
389
390             // Set the shard initialization timeout low for the test.
391
392             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
393
394             // Setup the InMemoryJournal to block shard recovery indefinitely.
395
396             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
397             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
398             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
399
400             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
401
402             // Create the read-write Tx
403
404             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
405             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
406
407             // Do a read on the Tx on a separate thread.
408
409             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
410                     txReadFuture = new AtomicReference<>();
411             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
412             final CountDownLatch txReadDone = new CountDownLatch(1);
413             Thread txThread = new Thread() {
414                 @Override
415                 public void run() {
416                     try {
417                         readWriteTx.write(TestModel.TEST_PATH,
418                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
419
420                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
421
422                         readWriteTx.close();
423                     } catch(Exception e) {
424                         caughtEx.set(e);
425                         return;
426                     } finally {
427                         txReadDone.countDown();
428                     }
429                 }
430             };
431
432             txThread.start();
433
434             // Wait for the Tx operations to complete.
435
436             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
437             if(caughtEx.get() != null) {
438                 throw caughtEx.get();
439             }
440
441             assertEquals("Tx read done", true, done);
442
443             // Wait for the read to complete. Since the shard never initialized, the Tx should
444             // have timed out and throw an appropriate exception cause.
445
446             try {
447                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
448             } catch(ReadFailedException e) {
449                 throw e.getCause();
450             } finally {
451                 blockRecoveryLatch.countDown();
452                 cleanup(dataStore);
453             }
454         }};
455     }
456
457     @Test(expected=NoShardLeaderException.class)
458     public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
459         new IntegrationTestKit(getSystem()) {{
460             String testName = "testTransactionCommitFailureWithNoShardLeader";
461             String shardName = "test-1";
462
463             // We don't want the shard to become the leader so prevent shard election from completing
464             // by setting the election timeout, which is based on the heartbeat interval, really high.
465
466             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
467
468             // Set the leader election timeout low for the test.
469
470             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
471
472             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
473
474             // Create the write Tx.
475
476             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
477             assertNotNull("newReadWriteTransaction returned null", writeTx);
478
479             // Do some modifications and ready the Tx on a separate thread.
480
481             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
482             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
483             final CountDownLatch txReady = new CountDownLatch(1);
484             Thread txThread = new Thread() {
485                 @Override
486                 public void run() {
487                     try {
488                         writeTx.write(TestModel.TEST_PATH,
489                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
490
491                         txCohort.set(writeTx.ready());
492                     } catch(Exception e) {
493                         caughtEx.set(e);
494                         return;
495                     } finally {
496                         txReady.countDown();
497                     }
498                 }
499             };
500
501             txThread.start();
502
503             // Wait for the Tx operations to complete.
504
505             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
506             if(caughtEx.get() != null) {
507                 throw caughtEx.get();
508             }
509
510             assertEquals("Tx ready", true, done);
511
512             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
513             // should have timed out and throw an appropriate exception cause.
514
515             try {
516                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
517             } catch(ExecutionException e) {
518                 throw e.getCause();
519             } finally {
520                 cleanup(dataStore);
521             }
522         }};
523     }
524
525     @Test
526     public void testTransactionAbort() throws Exception{
527         System.setProperty("shard.persistent", "true");
528         new IntegrationTestKit(getSystem()) {{
529             DistributedDataStore dataStore =
530                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
531
532             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
533             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
534
535             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
536
537             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
538
539             cohort.canCommit().get(5, TimeUnit.SECONDS);
540
541             cohort.abort().get(5, TimeUnit.SECONDS);
542
543             testWriteTransaction(dataStore, TestModel.TEST_PATH,
544                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
545
546             cleanup(dataStore);
547         }};
548     }
549
550     @Test
551     public void testTransactionChain() throws Exception{
552         new IntegrationTestKit(getSystem()) {{
553             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
554
555             // 1. Create a Tx chain and write-only Tx
556
557             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
558
559             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
560             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
561
562             // 2. Write some data
563
564             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
565             writeTx.write(TestModel.TEST_PATH, testNode);
566
567             // 3. Ready the Tx for commit
568
569             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
570
571             // 4. Commit the Tx on another thread that first waits for the second read Tx.
572
573             final CountDownLatch continueCommit1 = new CountDownLatch(1);
574             final CountDownLatch commit1Done = new CountDownLatch(1);
575             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
576             new Thread() {
577                 @Override
578                 public void run() {
579                     try {
580                         continueCommit1.await();
581                         doCommit(cohort1);
582                     } catch (Exception e) {
583                         commit1Error.set(e);
584                     } finally {
585                         commit1Done.countDown();
586                     }
587                 }
588             }.start();
589
590             // 5. Create a new read Tx from the chain to read and verify the data from the first
591             // Tx is visible after being readied.
592
593             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
594             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
595             assertEquals("isPresent", true, optional.isPresent());
596             assertEquals("Data node", testNode, optional.get());
597
598             // 6. Create a new RW Tx from the chain, write more data, and ready it
599
600             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
601             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
602             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
603
604             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
605
606             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
607             // verify it is visible.
608
609             readTx = txChain.newReadWriteTransaction();
610             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
611             assertEquals("isPresent", true, optional.isPresent());
612             assertEquals("Data node", outerNode, optional.get());
613
614             // 8. Wait for the 2 commits to complete and close the chain.
615
616             continueCommit1.countDown();
617             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
618
619             if(commit1Error.get() != null) {
620                 throw commit1Error.get();
621             }
622
623             doCommit(cohort2);
624
625             txChain.close();
626
627             // 9. Create a new read Tx from the data store and verify committed data.
628
629             readTx = dataStore.newReadOnlyTransaction();
630             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
631             assertEquals("isPresent", true, optional.isPresent());
632             assertEquals("Data node", outerNode, optional.get());
633
634             cleanup(dataStore);
635         }};
636     }
637
638     @Test
639     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
640         new IntegrationTestKit(getSystem()) {{
641             DistributedDataStore dataStore = setupDistributedDataStore(
642                     "testCreateChainedTransactionsInQuickSuccession", "test-1");
643
644             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
645
646             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
647
648             int nTxs = 20;
649             List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
650             for(int i = 0; i < nTxs; i++) {
651                 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
652
653                 rwTx.merge(TestModel.TEST_PATH, testNode);
654
655                 cohorts.add(rwTx.ready());
656
657             }
658
659             for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
660                 doCommit(cohort);
661             }
662
663             txChain.close();
664
665             cleanup(dataStore);
666         }};
667     }
668
669     @Test
670     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
671         new IntegrationTestKit(getSystem()) {{
672             DistributedDataStore dataStore = setupDistributedDataStore(
673                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
674
675             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
676
677             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
678
679             rwTx1.ready();
680
681             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
682
683             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
684             assertEquals("isPresent", false, optional.isPresent());
685
686             txChain.close();
687
688             cleanup(dataStore);
689         }};
690     }
691
692     @Test
693     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
694         new IntegrationTestKit(getSystem()) {{
695             DistributedDataStore dataStore = setupDistributedDataStore(
696                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
697
698             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
699
700             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
701             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
702
703             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
704
705             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
706             // readied.
707
708             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
709         }};
710     }
711
712     @Test
713     public void testCreateChainedTransactionAfterClose() throws Throwable {
714         new IntegrationTestKit(getSystem()) {{
715             DistributedDataStore dataStore = setupDistributedDataStore(
716                     "testCreateChainedTransactionAfterClose", "test-1");
717
718             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
719
720             txChain.close();
721
722             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
723
724             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
725         }};
726     }
727
728     @Test
729     public void testChangeListenerRegistration() throws Exception{
730         new IntegrationTestKit(getSystem()) {{
731             DistributedDataStore dataStore =
732                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
733
734             testWriteTransaction(dataStore, TestModel.TEST_PATH,
735                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
736
737             MockDataChangeListener listener = new MockDataChangeListener(1);
738
739             ListenerRegistration<MockDataChangeListener>
740                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
741                             DataChangeScope.SUBTREE);
742
743             assertNotNull("registerChangeListener returned null", listenerReg);
744
745             // Wait for the initial notification
746
747             listener.waitForChangeEvents(TestModel.TEST_PATH);
748
749             listener.reset(2);
750
751             // Write 2 updates.
752
753             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
754                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
755
756             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
757                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
758             testWriteTransaction(dataStore, listPath,
759                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
760
761             // Wait for the 2 updates.
762
763             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
764
765             listenerReg.close();
766
767             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
768                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
769                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
770
771             listener.expectNoMoreChanges("Received unexpected change after close");
772
773             cleanup(dataStore);
774         }};
775     }
776
777     class IntegrationTestKit extends ShardTestKit {
778
779         IntegrationTestKit(ActorSystem actorSystem) {
780             super(actorSystem);
781         }
782
783         DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
784             return setupDistributedDataStore(typeName, true, shardNames);
785         }
786
787         DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
788                 String... shardNames) {
789             MockClusterWrapper cluster = new MockClusterWrapper();
790             Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
791             ShardStrategyFactory.setConfiguration(config);
792
793             datastoreContextBuilder.dataStoreType(typeName);
794
795             DatastoreContext datastoreContext = datastoreContextBuilder.build();
796
797             DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
798                     config, datastoreContext);
799
800             SchemaContext schemaContext = SchemaContextHelper.full();
801             dataStore.onGlobalContextUpdated(schemaContext);
802
803             if(waitUntilLeader) {
804                 for(String shardName: shardNames) {
805                     ActorRef shard = null;
806                     for(int i = 0; i < 20 * 5 && shard == null; i++) {
807                         Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
808                         Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
809                         if(shardReply.isPresent()) {
810                             shard = shardReply.get();
811                         }
812                     }
813
814                     assertNotNull("Shard was not created", shard);
815
816                     waitUntilLeader(shard);
817                 }
818             }
819
820             return dataStore;
821         }
822
823         void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
824                 NormalizedNode<?, ?> nodeToWrite) throws Exception {
825
826             // 1. Create a write-only Tx
827
828             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
829             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
830
831             // 2. Write some data
832
833             writeTx.write(nodePath, nodeToWrite);
834
835             // 3. Ready the Tx for commit
836
837             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
838
839             // 4. Commit the Tx
840
841             doCommit(cohort);
842
843             // 5. Verify the data in the store
844
845             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
846
847             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
848             assertEquals("isPresent", true, optional.isPresent());
849             assertEquals("Data node", nodeToWrite, optional.get());
850         }
851
852         void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
853             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
854             assertEquals("canCommit", true, canCommit);
855             cohort.preCommit().get(5, TimeUnit.SECONDS);
856             cohort.commit().get(5, TimeUnit.SECONDS);
857         }
858
859         void cleanup(DistributedDataStore dataStore) {
860             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
861         }
862
863         void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
864                 throws Exception {
865             try {
866                 callable.call();
867                 fail("Expected " + expType.getSimpleName());
868             } catch(Exception e) {
869                 assertEquals("Exception type", expType, e.getClass());
870             }
871         }
872
873         void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
874                 Class<? extends Exception> expType) throws Exception {
875             assertExceptionOnCall(new Callable<Void>() {
876                 @Override
877                 public Void call() throws Exception {
878                     txChain.newWriteOnlyTransaction();
879                     return null;
880                 }
881             }, expType);
882
883             assertExceptionOnCall(new Callable<Void>() {
884                 @Override
885                 public Void call() throws Exception {
886                     txChain.newReadWriteTransaction();
887                     return null;
888                 }
889             }, expType);
890
891             assertExceptionOnCall(new Callable<Void>() {
892                 @Override
893                 public Void call() throws Exception {
894                     txChain.newReadOnlyTransaction();
895                     return null;
896                 }
897             }, expType);
898         }
899     }
900
901 }