a3c5eb4b003c666d9388e4f70ba51790c09db9dd
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
6 import akka.actor.ActorRef;
7 import akka.actor.ActorSystem;
8 import akka.actor.PoisonPill;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.Uninterruptibles;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CountDownLatch;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicReference;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
22 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
23 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
24 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
25 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
26 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
27 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
28 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44
45 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
46
47     private final DatastoreContext.Builder datastoreContextBuilder =
48             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
49
50     @Test
51     public void testWriteTransactionWithSingleShard() throws Exception{
52         new IntegrationTestKit(getSystem()) {{
53             DistributedDataStore dataStore =
54                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
55
56             testWriteTransaction(dataStore, TestModel.TEST_PATH,
57                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
58
59             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
60                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
61
62             cleanup(dataStore);
63         }};
64     }
65
66     @Test
67     public void testWriteTransactionWithMultipleShards() throws Exception{
68         new IntegrationTestKit(getSystem()) {{
69             DistributedDataStore dataStore =
70                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
71
72             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
73             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
74
75             YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
76             NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
77             writeTx.write(nodePath1, nodeToWrite1);
78
79             YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
80             NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
81             writeTx.write(nodePath2, nodeToWrite2);
82
83             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
84
85             doCommit(cohort);
86
87             // Verify the data in the store
88
89             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
90
91             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
92             assertEquals("isPresent", true, optional.isPresent());
93             assertEquals("Data node", nodeToWrite1, optional.get());
94
95             optional = readTx.read(nodePath2).get();
96             assertEquals("isPresent", true, optional.isPresent());
97             assertEquals("Data node", nodeToWrite2, optional.get());
98
99             cleanup(dataStore);
100         }};
101     }
102
103     @Test
104     public void testReadWriteTransaction() throws Exception{
105         System.setProperty("shard.persistent", "true");
106         new IntegrationTestKit(getSystem()) {{
107             DistributedDataStore dataStore =
108                     setupDistributedDataStore("testReadWriteTransaction", "test-1");
109
110             // 1. Create a read-write Tx
111
112             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
113             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
114
115             // 2. Write some data
116
117             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
118             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
119             readWriteTx.write(nodePath, nodeToWrite );
120
121             // 3. Read the data from Tx
122
123             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
124             assertEquals("exists", true, exists);
125
126             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
127             assertEquals("isPresent", true, optional.isPresent());
128             assertEquals("Data node", nodeToWrite, optional.get());
129
130             // 4. Ready the Tx for commit
131
132             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
133
134             // 5. Commit the Tx
135
136             doCommit(cohort);
137
138             // 6. Verify the data in the store
139
140             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
141
142             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
143             assertEquals("isPresent", true, optional.isPresent());
144             assertEquals("Data node", nodeToWrite, optional.get());
145
146             cleanup(dataStore);
147         }};
148     }
149
150     @Test
151     public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{
152         new IntegrationTestKit(getSystem()) {{
153             String testName = "testTransactionWritesWithShardNotInitiallyReady";
154             String shardName = "test-1";
155
156             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
157             // initialized until we create and submit the write the Tx.
158             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
159             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
160             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
161
162             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
163
164             // Create the write Tx
165
166             // TODO - we'll want to test this with write-only as well when FindPrimary returns the leader shard.
167             final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
168             assertNotNull("newReadWriteTransaction returned null", writeTx);
169
170             // Do some modification operations and ready the Tx on a separate thread.
171
172             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
173                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
174                             TestModel.ID_QNAME, 1).build();
175
176             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
177             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
178             final CountDownLatch txReady = new CountDownLatch(1);
179             Thread txThread = new Thread() {
180                 @Override
181                 public void run() {
182                     try {
183                         writeTx.write(TestModel.TEST_PATH,
184                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
185
186                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
187                                 TestModel.OUTER_LIST_QNAME).build());
188
189                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
190                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
191
192                         writeTx.delete(listEntryPath);
193
194                         txCohort.set(writeTx.ready());
195                     } catch(Exception e) {
196                         caughtEx.set(e);
197                         return;
198                     } finally {
199                         txReady.countDown();
200                     }
201                 }
202             };
203
204             txThread.start();
205
206             // Wait for the Tx operations to complete.
207
208             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
209             if(caughtEx.get() != null) {
210                 throw caughtEx.get();
211             }
212
213             assertEquals("Tx ready", true, done);
214
215             // At this point the Tx operations should be waiting for the shard to initialize so
216             // trigger the latch to let the shard recovery to continue.
217
218             blockRecoveryLatch.countDown();
219
220             // Wait for the Tx commit to complete.
221
222             doCommit(txCohort.get());
223
224             // Verify the data in the store
225
226             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
227
228             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
229                     get(5, TimeUnit.SECONDS);
230             assertEquals("isPresent", true, optional.isPresent());
231
232             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
233             assertEquals("isPresent", true, optional.isPresent());
234
235             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
236             assertEquals("isPresent", false, optional.isPresent());
237
238             cleanup(dataStore);
239         }};
240     }
241
242     @Test
243     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{
244         new IntegrationTestKit(getSystem()) {{
245             String testName = "testTransactionReadsWithShardNotInitiallyReady";
246             String shardName = "test-1";
247
248             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
249             // initialized until we create the Tx.
250             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
251             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
252             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
253
254             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
255
256             // Create the read-write Tx
257
258             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
259             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
260
261             // Do some reads on the Tx on a separate thread.
262
263             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
264                     new AtomicReference<>();
265             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
266                     txReadFuture = new AtomicReference<>();
267             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
268             final CountDownLatch txReadsDone = new CountDownLatch(1);
269             Thread txThread = new Thread() {
270                 @Override
271                 public void run() {
272                     try {
273                         readWriteTx.write(TestModel.TEST_PATH,
274                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
275
276                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
277
278                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
279                     } catch(Exception e) {
280                         caughtEx.set(e);
281                         return;
282                     } finally {
283                         txReadsDone.countDown();
284                     }
285                 }
286             };
287
288             txThread.start();
289
290             // Wait for the Tx operations to complete.
291
292             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
293             if(caughtEx.get() != null) {
294                 throw caughtEx.get();
295             }
296
297             assertEquals("Tx reads done", true, done);
298
299             // At this point the Tx operations should be waiting for the shard to initialize so
300             // trigger the latch to let the shard recovery to continue.
301
302             blockRecoveryLatch.countDown();
303
304             // Wait for the reads to complete and verify.
305
306             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
307             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
308
309             readWriteTx.close();
310
311             cleanup(dataStore);
312         }};
313     }
314
315     @Test(expected=NotInitializedException.class)
316     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
317         new IntegrationTestKit(getSystem()) {{
318             String testName = "testTransactionCommitFailureWithShardNotInitialized";
319             String shardName = "test-1";
320
321             // Set the shard initialization timeout low for the test.
322
323             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
324
325             // Setup the InMemoryJournal to block shard recovery indefinitely.
326
327             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
328             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
329             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
330
331             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
332
333             // Create the write Tx
334
335             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
336             assertNotNull("newReadWriteTransaction returned null", writeTx);
337
338             // Do some modifications and ready the Tx on a separate thread.
339
340             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
341             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
342             final CountDownLatch txReady = new CountDownLatch(1);
343             Thread txThread = new Thread() {
344                 @Override
345                 public void run() {
346                     try {
347                         writeTx.write(TestModel.TEST_PATH,
348                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
349
350                         txCohort.set(writeTx.ready());
351                     } catch(Exception e) {
352                         caughtEx.set(e);
353                         return;
354                     } finally {
355                         txReady.countDown();
356                     }
357                 }
358             };
359
360             txThread.start();
361
362             // Wait for the Tx operations to complete.
363
364             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
365             if(caughtEx.get() != null) {
366                 throw caughtEx.get();
367             }
368
369             assertEquals("Tx ready", true, done);
370
371             // Wait for the commit to complete. Since the shard never initialized, the Tx should
372             // have timed out and throw an appropriate exception cause.
373
374             try {
375                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
376             } catch(ExecutionException e) {
377                 throw e.getCause();
378             } finally {
379                 blockRecoveryLatch.countDown();
380                 cleanup(dataStore);
381             }
382         }};
383     }
384
385     @Test(expected=NotInitializedException.class)
386     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
387         new IntegrationTestKit(getSystem()) {{
388             String testName = "testTransactionReadFailureWithShardNotInitialized";
389             String shardName = "test-1";
390
391             // Set the shard initialization timeout low for the test.
392
393             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
394
395             // Setup the InMemoryJournal to block shard recovery indefinitely.
396
397             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
398             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
399             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
400
401             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
402
403             // Create the read-write Tx
404
405             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
406             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
407
408             // Do a read on the Tx on a separate thread.
409
410             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
411                     txReadFuture = new AtomicReference<>();
412             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
413             final CountDownLatch txReadDone = new CountDownLatch(1);
414             Thread txThread = new Thread() {
415                 @Override
416                 public void run() {
417                     try {
418                         readWriteTx.write(TestModel.TEST_PATH,
419                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
420
421                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
422
423                         readWriteTx.close();
424                     } catch(Exception e) {
425                         caughtEx.set(e);
426                         return;
427                     } finally {
428                         txReadDone.countDown();
429                     }
430                 }
431             };
432
433             txThread.start();
434
435             // Wait for the Tx operations to complete.
436
437             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
438             if(caughtEx.get() != null) {
439                 throw caughtEx.get();
440             }
441
442             assertEquals("Tx read done", true, done);
443
444             // Wait for the read to complete. Since the shard never initialized, the Tx should
445             // have timed out and throw an appropriate exception cause.
446
447             try {
448                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
449             } catch(ReadFailedException e) {
450                 throw e.getCause();
451             } finally {
452                 blockRecoveryLatch.countDown();
453                 cleanup(dataStore);
454             }
455         }};
456     }
457
458     @Test(expected=NoShardLeaderException.class)
459     public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{
460         new IntegrationTestKit(getSystem()) {{
461             String testName = "testTransactionCommitFailureWithNoShardLeader";
462             String shardName = "test-1";
463
464             // We don't want the shard to become the leader so prevent shard election from completing
465             // by setting the election timeout, which is based on the heartbeat interval, really high.
466
467             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
468
469             // Set the leader election timeout low for the test.
470
471             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
472
473             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
474
475             // Create the write Tx.
476
477             final DOMStoreWriteTransaction writeTx = dataStore.newReadWriteTransaction();
478             assertNotNull("newReadWriteTransaction returned null", writeTx);
479
480             // Do some modifications and ready the Tx on a separate thread.
481
482             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
483             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
484             final CountDownLatch txReady = new CountDownLatch(1);
485             Thread txThread = new Thread() {
486                 @Override
487                 public void run() {
488                     try {
489                         writeTx.write(TestModel.TEST_PATH,
490                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
491
492                         txCohort.set(writeTx.ready());
493                     } catch(Exception e) {
494                         caughtEx.set(e);
495                         return;
496                     } finally {
497                         txReady.countDown();
498                     }
499                 }
500             };
501
502             txThread.start();
503
504             // Wait for the Tx operations to complete.
505
506             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
507             if(caughtEx.get() != null) {
508                 throw caughtEx.get();
509             }
510
511             assertEquals("Tx ready", true, done);
512
513             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
514             // should have timed out and throw an appropriate exception cause.
515
516             try {
517                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
518             } catch(ExecutionException e) {
519                 throw e.getCause();
520             } finally {
521                 cleanup(dataStore);
522             }
523         }};
524     }
525
526     @Test
527     public void testTransactionAbort() throws Exception{
528         System.setProperty("shard.persistent", "true");
529         new IntegrationTestKit(getSystem()) {{
530             DistributedDataStore dataStore =
531                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
532
533             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
534             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
535
536             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
537
538             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
539
540             cohort.canCommit().get(5, TimeUnit.SECONDS);
541
542             cohort.abort().get(5, TimeUnit.SECONDS);
543
544             testWriteTransaction(dataStore, TestModel.TEST_PATH,
545                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
546
547             cleanup(dataStore);
548         }};
549     }
550
551     @Test
552     public void testTransactionChain() throws Exception{
553         new IntegrationTestKit(getSystem()) {{
554             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
555
556             // 1. Create a Tx chain and write-only Tx
557
558             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
559
560             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
561             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
562
563             // 2. Write some data
564
565             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
566             writeTx.write(TestModel.TEST_PATH, testNode);
567
568             // 3. Ready the Tx for commit
569
570             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
571
572             // 4. Commit the Tx on another thread that first waits for the second read Tx.
573
574             final CountDownLatch continueCommit1 = new CountDownLatch(1);
575             final CountDownLatch commit1Done = new CountDownLatch(1);
576             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
577             new Thread() {
578                 @Override
579                 public void run() {
580                     try {
581                         continueCommit1.await();
582                         doCommit(cohort1);
583                     } catch (Exception e) {
584                         commit1Error.set(e);
585                     } finally {
586                         commit1Done.countDown();
587                     }
588                 }
589             }.start();
590
591             // 5. Create a new read Tx from the chain to read and verify the data from the first
592             // Tx is visible after being readied.
593
594             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
595             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
596             assertEquals("isPresent", true, optional.isPresent());
597             assertEquals("Data node", testNode, optional.get());
598
599             // 6. Create a new RW Tx from the chain, write more data, and ready it
600
601             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
602             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
603             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
604
605             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
606
607             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
608             // verify it is visible.
609
610             readTx = txChain.newReadWriteTransaction();
611             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
612             assertEquals("isPresent", true, optional.isPresent());
613             assertEquals("Data node", outerNode, optional.get());
614
615             // 8. Wait for the 2 commits to complete and close the chain.
616
617             continueCommit1.countDown();
618             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
619
620             if(commit1Error.get() != null) {
621                 throw commit1Error.get();
622             }
623
624             doCommit(cohort2);
625
626             txChain.close();
627
628             // 9. Create a new read Tx from the data store and verify committed data.
629
630             readTx = dataStore.newReadOnlyTransaction();
631             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
632             assertEquals("isPresent", true, optional.isPresent());
633             assertEquals("Data node", outerNode, optional.get());
634
635             cleanup(dataStore);
636         }};
637     }
638
639     @Test
640     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
641         new IntegrationTestKit(getSystem()) {{
642             DistributedDataStore dataStore = setupDistributedDataStore(
643                     "testCreateChainedTransactionsInQuickSuccession", "test-1");
644
645             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
646
647             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
648
649             int nTxs = 20;
650             List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
651             for(int i = 0; i < nTxs; i++) {
652                 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
653
654                 rwTx.merge(TestModel.TEST_PATH, testNode);
655
656                 cohorts.add(rwTx.ready());
657
658             }
659
660             for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
661                 doCommit(cohort);
662             }
663
664             txChain.close();
665
666             cleanup(dataStore);
667         }};
668     }
669
670     @Test
671     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
672         new IntegrationTestKit(getSystem()) {{
673             DistributedDataStore dataStore = setupDistributedDataStore(
674                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
675
676             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
677
678             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
679
680             rwTx1.ready();
681
682             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
683
684             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
685             assertEquals("isPresent", false, optional.isPresent());
686
687             txChain.close();
688
689             cleanup(dataStore);
690         }};
691     }
692
693     @Test
694     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
695         new IntegrationTestKit(getSystem()) {{
696             DistributedDataStore dataStore = setupDistributedDataStore(
697                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
698
699             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
700
701             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
702             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
703
704             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
705
706             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
707             // readied.
708
709             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
710         }};
711     }
712
713     @Test
714     public void testCreateChainedTransactionAfterClose() throws Throwable {
715         new IntegrationTestKit(getSystem()) {{
716             DistributedDataStore dataStore = setupDistributedDataStore(
717                     "testCreateChainedTransactionAfterClose", "test-1");
718
719             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
720
721             txChain.close();
722
723             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
724
725             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
726         }};
727     }
728
729     @Test
730     public void testChangeListenerRegistration() throws Exception{
731         new IntegrationTestKit(getSystem()) {{
732             DistributedDataStore dataStore =
733                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
734
735             testWriteTransaction(dataStore, TestModel.TEST_PATH,
736                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
737
738             MockDataChangeListener listener = new MockDataChangeListener(1);
739
740             ListenerRegistration<MockDataChangeListener>
741                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
742                             DataChangeScope.SUBTREE);
743
744             assertNotNull("registerChangeListener returned null", listenerReg);
745
746             // Wait for the initial notification
747
748             listener.waitForChangeEvents(TestModel.TEST_PATH);
749
750             listener.reset(2);
751
752             // Write 2 updates.
753
754             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
755                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
756
757             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
758                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
759             testWriteTransaction(dataStore, listPath,
760                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
761
762             // Wait for the 2 updates.
763
764             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
765
766             listenerReg.close();
767
768             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
769                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
770                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
771
772             listener.expectNoMoreChanges("Received unexpected change after close");
773
774             cleanup(dataStore);
775         }};
776     }
777
778     class IntegrationTestKit extends ShardTestKit {
779
780         IntegrationTestKit(ActorSystem actorSystem) {
781             super(actorSystem);
782         }
783
784         DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
785             return setupDistributedDataStore(typeName, true, shardNames);
786         }
787
788         DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
789                 String... shardNames) {
790             MockClusterWrapper cluster = new MockClusterWrapper();
791             Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
792             ShardStrategyFactory.setConfiguration(config);
793
794             datastoreContextBuilder.dataStoreType(typeName);
795
796             DatastoreContext datastoreContext = datastoreContextBuilder.build();
797
798             DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
799                     config, datastoreContext);
800
801             SchemaContext schemaContext = SchemaContextHelper.full();
802             dataStore.onGlobalContextUpdated(schemaContext);
803
804             if(waitUntilLeader) {
805                 for(String shardName: shardNames) {
806                     ActorRef shard = null;
807                     for(int i = 0; i < 20 * 5 && shard == null; i++) {
808                         Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
809                         Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
810                         if(shardReply.isPresent()) {
811                             shard = shardReply.get();
812                         }
813                     }
814
815                     assertNotNull("Shard was not created", shard);
816
817                     waitUntilLeader(shard);
818                 }
819             }
820
821             return dataStore;
822         }
823
824         void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
825                 NormalizedNode<?, ?> nodeToWrite) throws Exception {
826
827             // 1. Create a write-only Tx
828
829             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
830             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
831
832             // 2. Write some data
833
834             writeTx.write(nodePath, nodeToWrite);
835
836             // 3. Ready the Tx for commit
837
838             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
839
840             // 4. Commit the Tx
841
842             doCommit(cohort);
843
844             // 5. Verify the data in the store
845
846             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
847
848             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
849             assertEquals("isPresent", true, optional.isPresent());
850             assertEquals("Data node", nodeToWrite, optional.get());
851         }
852
853         void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
854             Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
855             assertEquals("canCommit", true, canCommit);
856             cohort.preCommit().get(5, TimeUnit.SECONDS);
857             cohort.commit().get(5, TimeUnit.SECONDS);
858         }
859
860         void cleanup(DistributedDataStore dataStore) {
861             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
862         }
863
864         void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
865                 throws Exception {
866             try {
867                 callable.call();
868                 fail("Expected " + expType.getSimpleName());
869             } catch(Exception e) {
870                 assertEquals("Exception type", expType, e.getClass());
871             }
872         }
873
874         void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
875                 Class<? extends Exception> expType) throws Exception {
876             assertExceptionOnCall(new Callable<Void>() {
877                 @Override
878                 public Void call() throws Exception {
879                     txChain.newWriteOnlyTransaction();
880                     return null;
881                 }
882             }, expType);
883
884             assertExceptionOnCall(new Callable<Void>() {
885                 @Override
886                 public Void call() throws Exception {
887                     txChain.newReadWriteTransaction();
888                     return null;
889                 }
890             }, expType);
891
892             assertExceptionOnCall(new Callable<Void>() {
893                 @Override
894                 public Void call() throws Exception {
895                     txChain.newReadOnlyTransaction();
896                     return null;
897                 }
898             }, expType);
899         }
900     }
901
902 }