Merge "Event Source: switch from wildcards to regexs"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
6 import akka.actor.ActorRef;
7 import akka.actor.ActorSystem;
8 import akka.actor.PoisonPill;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.Uninterruptibles;
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CountDownLatch;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicReference;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
22 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
23 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
24 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
25 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
26 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
27 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
28 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
42 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44
45 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
46
47     private final DatastoreContext.Builder datastoreContextBuilder =
48             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
49
50     @Test
51     public void testWriteTransactionWithSingleShard() throws Exception{
52         new IntegrationTestKit(getSystem()) {{
53             DistributedDataStore dataStore =
54                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
55
56             testWriteTransaction(dataStore, TestModel.TEST_PATH,
57                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
58
59             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
60                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
61
62             cleanup(dataStore);
63         }};
64     }
65
66     @Test
67     public void testWriteTransactionWithMultipleShards() throws Exception{
68         new IntegrationTestKit(getSystem()) {{
69             DistributedDataStore dataStore =
70                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
71
72             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
73             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
74
75             YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
76             NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
77             writeTx.write(nodePath1, nodeToWrite1);
78
79             YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
80             NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
81             writeTx.write(nodePath2, nodeToWrite2);
82
83             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
84
85             doCommit(cohort);
86
87             // Verify the data in the store
88
89             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
90
91             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
92             assertEquals("isPresent", true, optional.isPresent());
93             assertEquals("Data node", nodeToWrite1, optional.get());
94
95             optional = readTx.read(nodePath2).get();
96             assertEquals("isPresent", true, optional.isPresent());
97             assertEquals("Data node", nodeToWrite2, optional.get());
98
99             cleanup(dataStore);
100         }};
101     }
102
103     @Test
104     public void testReadWriteTransaction() throws Exception{
105         System.setProperty("shard.persistent", "true");
106         new IntegrationTestKit(getSystem()) {{
107             DistributedDataStore dataStore =
108                     setupDistributedDataStore("testReadWriteTransaction", "test-1");
109
110             // 1. Create a read-write Tx
111
112             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
113             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
114
115             // 2. Write some data
116
117             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
118             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
119             readWriteTx.write(nodePath, nodeToWrite );
120
121             // 3. Read the data from Tx
122
123             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
124             assertEquals("exists", true, exists);
125
126             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
127             assertEquals("isPresent", true, optional.isPresent());
128             assertEquals("Data node", nodeToWrite, optional.get());
129
130             // 4. Ready the Tx for commit
131
132             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
133
134             // 5. Commit the Tx
135
136             doCommit(cohort);
137
138             // 6. Verify the data in the store
139
140             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
141
142             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
143             assertEquals("isPresent", true, optional.isPresent());
144             assertEquals("Data node", nodeToWrite, optional.get());
145
146             cleanup(dataStore);
147         }};
148     }
149
150     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
151             final boolean writeOnly) throws Exception {
152         new IntegrationTestKit(getSystem()) {{
153             String shardName = "test-1";
154
155             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
156             // initialized until we create and submit the write the Tx.
157             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
158             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
159             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
160
161             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
162
163             // Create the write Tx
164
165             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
166                     dataStore.newReadWriteTransaction();
167             assertNotNull("newReadWriteTransaction returned null", writeTx);
168
169             // Do some modification operations and ready the Tx on a separate thread.
170
171             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
172                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
173                             TestModel.ID_QNAME, 1).build();
174
175             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
176             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
177             final CountDownLatch txReady = new CountDownLatch(1);
178             Thread txThread = new Thread() {
179                 @Override
180                 public void run() {
181                     try {
182                         writeTx.write(TestModel.TEST_PATH,
183                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
184
185                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
186                                 TestModel.OUTER_LIST_QNAME).build());
187
188                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
189                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
190
191                         writeTx.delete(listEntryPath);
192
193                         txCohort.set(writeTx.ready());
194                     } catch(Exception e) {
195                         caughtEx.set(e);
196                         return;
197                     } finally {
198                         txReady.countDown();
199                     }
200                 }
201             };
202
203             txThread.start();
204
205             // Wait for the Tx operations to complete.
206
207             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
208             if(caughtEx.get() != null) {
209                 throw caughtEx.get();
210             }
211
212             assertEquals("Tx ready", true, done);
213
214             // At this point the Tx operations should be waiting for the shard to initialize so
215             // trigger the latch to let the shard recovery to continue.
216
217             blockRecoveryLatch.countDown();
218
219             // Wait for the Tx commit to complete.
220
221             doCommit(txCohort.get());
222
223             // Verify the data in the store
224
225             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
226
227             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
228                     get(5, TimeUnit.SECONDS);
229             assertEquals("isPresent", true, optional.isPresent());
230
231             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
232             assertEquals("isPresent", true, optional.isPresent());
233
234             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
235             assertEquals("isPresent", false, optional.isPresent());
236
237             cleanup(dataStore);
238         }};
239     }
240
241     @Test
242     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
243         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
244         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
245     }
246
247     @Test
248     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
249         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
250     }
251
252     @Test
253     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
254         new IntegrationTestKit(getSystem()) {{
255             String testName = "testTransactionReadsWithShardNotInitiallyReady";
256             String shardName = "test-1";
257
258             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
259             // initialized until we create the Tx.
260             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
261             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
262             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
263
264             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
265
266             // Create the read-write Tx
267
268             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
269             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
270
271             // Do some reads on the Tx on a separate thread.
272
273             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
274                     new AtomicReference<>();
275             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
276                     txReadFuture = new AtomicReference<>();
277             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
278             final CountDownLatch txReadsDone = new CountDownLatch(1);
279             Thread txThread = new Thread() {
280                 @Override
281                 public void run() {
282                     try {
283                         readWriteTx.write(TestModel.TEST_PATH,
284                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
285
286                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
287
288                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
289                     } catch(Exception e) {
290                         caughtEx.set(e);
291                         return;
292                     } finally {
293                         txReadsDone.countDown();
294                     }
295                 }
296             };
297
298             txThread.start();
299
300             // Wait for the Tx operations to complete.
301
302             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
303             if(caughtEx.get() != null) {
304                 throw caughtEx.get();
305             }
306
307             assertEquals("Tx reads done", true, done);
308
309             // At this point the Tx operations should be waiting for the shard to initialize so
310             // trigger the latch to let the shard recovery to continue.
311
312             blockRecoveryLatch.countDown();
313
314             // Wait for the reads to complete and verify.
315
316             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
317             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
318
319             readWriteTx.close();
320
321             cleanup(dataStore);
322         }};
323     }
324
325     @Test(expected=NotInitializedException.class)
326     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
327         new IntegrationTestKit(getSystem()) {{
328             String testName = "testTransactionCommitFailureWithShardNotInitialized";
329             String shardName = "test-1";
330
331             // Set the shard initialization timeout low for the test.
332
333             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
334
335             // Setup the InMemoryJournal to block shard recovery indefinitely.
336
337             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
338             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
339             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
340
341             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
342
343             // Create the write Tx
344
345             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
346             assertNotNull("newReadWriteTransaction returned null", writeTx);
347
348             // Do some modifications and ready the Tx on a separate thread.
349
350             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
351             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
352             final CountDownLatch txReady = new CountDownLatch(1);
353             Thread txThread = new Thread() {
354                 @Override
355                 public void run() {
356                     try {
357                         writeTx.write(TestModel.TEST_PATH,
358                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
359
360                         txCohort.set(writeTx.ready());
361                     } catch(Exception e) {
362                         caughtEx.set(e);
363                         return;
364                     } finally {
365                         txReady.countDown();
366                     }
367                 }
368             };
369
370             txThread.start();
371
372             // Wait for the Tx operations to complete.
373
374             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
375             if(caughtEx.get() != null) {
376                 throw caughtEx.get();
377             }
378
379             assertEquals("Tx ready", true, done);
380
381             // Wait for the commit to complete. Since the shard never initialized, the Tx should
382             // have timed out and throw an appropriate exception cause.
383
384             try {
385                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
386             } catch(ExecutionException e) {
387                 throw e.getCause();
388             } finally {
389                 blockRecoveryLatch.countDown();
390                 cleanup(dataStore);
391             }
392         }};
393     }
394
395     @Test(expected=NotInitializedException.class)
396     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
397         new IntegrationTestKit(getSystem()) {{
398             String testName = "testTransactionReadFailureWithShardNotInitialized";
399             String shardName = "test-1";
400
401             // Set the shard initialization timeout low for the test.
402
403             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
404
405             // Setup the InMemoryJournal to block shard recovery indefinitely.
406
407             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
408             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
409             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
410
411             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
412
413             // Create the read-write Tx
414
415             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
416             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
417
418             // Do a read on the Tx on a separate thread.
419
420             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
421                     txReadFuture = new AtomicReference<>();
422             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
423             final CountDownLatch txReadDone = new CountDownLatch(1);
424             Thread txThread = new Thread() {
425                 @Override
426                 public void run() {
427                     try {
428                         readWriteTx.write(TestModel.TEST_PATH,
429                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
430
431                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
432
433                         readWriteTx.close();
434                     } catch(Exception e) {
435                         caughtEx.set(e);
436                         return;
437                     } finally {
438                         txReadDone.countDown();
439                     }
440                 }
441             };
442
443             txThread.start();
444
445             // Wait for the Tx operations to complete.
446
447             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
448             if(caughtEx.get() != null) {
449                 throw caughtEx.get();
450             }
451
452             assertEquals("Tx read done", true, done);
453
454             // Wait for the read to complete. Since the shard never initialized, the Tx should
455             // have timed out and throw an appropriate exception cause.
456
457             try {
458                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
459             } catch(ReadFailedException e) {
460                 throw e.getCause();
461             } finally {
462                 blockRecoveryLatch.countDown();
463                 cleanup(dataStore);
464             }
465         }};
466     }
467
468     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
469         new IntegrationTestKit(getSystem()) {{
470             String testName = "testTransactionCommitFailureWithNoShardLeader";
471             String shardName = "default";
472
473             // We don't want the shard to become the leader so prevent shard election from completing
474             // by setting the election timeout, which is based on the heartbeat interval, really high.
475
476             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
477             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
478
479             // Set the leader election timeout low for the test.
480
481             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
482
483             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
484
485             // Create the write Tx.
486
487             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
488                 dataStore.newReadWriteTransaction();
489             assertNotNull("newReadWriteTransaction returned null", writeTx);
490
491             // Do some modifications and ready the Tx on a separate thread.
492
493             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
494             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
495             final CountDownLatch txReady = new CountDownLatch(1);
496             Thread txThread = new Thread() {
497                 @Override
498                 public void run() {
499                     try {
500                         writeTx.write(TestModel.JUNK_PATH,
501                                 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
502
503                         txCohort.set(writeTx.ready());
504                     } catch(Exception e) {
505                         caughtEx.set(e);
506                         return;
507                     } finally {
508                         txReady.countDown();
509                     }
510                 }
511             };
512
513             txThread.start();
514
515             // Wait for the Tx operations to complete.
516
517             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
518             if(caughtEx.get() != null) {
519                 throw caughtEx.get();
520             }
521
522             assertEquals("Tx ready", true, done);
523
524             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
525             // should have timed out and throw an appropriate exception cause.
526
527             try {
528                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
529             } catch(ExecutionException e) {
530                 throw e.getCause();
531             } finally {
532                 cleanup(dataStore);
533             }
534         }};
535     }
536
537     @Test(expected=NoShardLeaderException.class)
538     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
539         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
540         testTransactionCommitFailureWithNoShardLeader(true);
541     }
542
543     @Test(expected=NoShardLeaderException.class)
544     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
545         testTransactionCommitFailureWithNoShardLeader(false);
546     }
547
548     @Test
549     public void testTransactionAbort() throws Exception{
550         System.setProperty("shard.persistent", "true");
551         new IntegrationTestKit(getSystem()) {{
552             DistributedDataStore dataStore =
553                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
554
555             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
556             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
557
558             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
559
560             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
561
562             cohort.canCommit().get(5, TimeUnit.SECONDS);
563
564             cohort.abort().get(5, TimeUnit.SECONDS);
565
566             testWriteTransaction(dataStore, TestModel.TEST_PATH,
567                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
568
569             cleanup(dataStore);
570         }};
571     }
572
573     @Test
574     public void testTransactionChain() throws Exception{
575         new IntegrationTestKit(getSystem()) {{
576             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
577
578             // 1. Create a Tx chain and write-only Tx
579
580             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
581
582             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
583             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
584
585             // 2. Write some data
586
587             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
588             writeTx.write(TestModel.TEST_PATH, testNode);
589
590             // 3. Ready the Tx for commit
591
592             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
593
594             // 4. Commit the Tx on another thread that first waits for the second read Tx.
595
596             final CountDownLatch continueCommit1 = new CountDownLatch(1);
597             final CountDownLatch commit1Done = new CountDownLatch(1);
598             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
599             new Thread() {
600                 @Override
601                 public void run() {
602                     try {
603                         continueCommit1.await();
604                         doCommit(cohort1);
605                     } catch (Exception e) {
606                         commit1Error.set(e);
607                     } finally {
608                         commit1Done.countDown();
609                     }
610                 }
611             }.start();
612
613             // 5. Create a new read Tx from the chain to read and verify the data from the first
614             // Tx is visible after being readied.
615
616             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
617             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
618             assertEquals("isPresent", true, optional.isPresent());
619             assertEquals("Data node", testNode, optional.get());
620
621             // 6. Create a new RW Tx from the chain, write more data, and ready it
622
623             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
624             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
625             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
626
627             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
628
629             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
630             // verify it is visible.
631
632             readTx = txChain.newReadWriteTransaction();
633             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
634             assertEquals("isPresent", true, optional.isPresent());
635             assertEquals("Data node", outerNode, optional.get());
636
637             // 8. Wait for the 2 commits to complete and close the chain.
638
639             continueCommit1.countDown();
640             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
641
642             if(commit1Error.get() != null) {
643                 throw commit1Error.get();
644             }
645
646             doCommit(cohort2);
647
648             txChain.close();
649
650             // 9. Create a new read Tx from the data store and verify committed data.
651
652             readTx = dataStore.newReadOnlyTransaction();
653             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
654             assertEquals("isPresent", true, optional.isPresent());
655             assertEquals("Data node", outerNode, optional.get());
656
657             cleanup(dataStore);
658         }};
659     }
660
661     @Test
662     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
663         new IntegrationTestKit(getSystem()) {{
664             DistributedDataStore dataStore = setupDistributedDataStore(
665                     "testCreateChainedTransactionsInQuickSuccession", "test-1");
666
667             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
668
669             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
670
671             int nTxs = 20;
672             List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
673             for(int i = 0; i < nTxs; i++) {
674                 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
675
676                 rwTx.merge(TestModel.TEST_PATH, testNode);
677
678                 cohorts.add(rwTx.ready());
679
680             }
681
682             for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
683                 doCommit(cohort);
684             }
685
686             txChain.close();
687
688             cleanup(dataStore);
689         }};
690     }
691
692     @Test
693     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
694         new IntegrationTestKit(getSystem()) {{
695             DistributedDataStore dataStore = setupDistributedDataStore(
696                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
697
698             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
699
700             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
701
702             rwTx1.ready();
703
704             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
705
706             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
707             assertEquals("isPresent", false, optional.isPresent());
708
709             txChain.close();
710
711             cleanup(dataStore);
712         }};
713     }
714
715     @Test
716     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
717         new IntegrationTestKit(getSystem()) {{
718             DistributedDataStore dataStore = setupDistributedDataStore(
719                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
720
721             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
722
723             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
724             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
725
726             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
727
728             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
729             // readied.
730
731             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
732         }};
733     }
734
735     @Test
736     public void testCreateChainedTransactionAfterClose() throws Throwable {
737         new IntegrationTestKit(getSystem()) {{
738             DistributedDataStore dataStore = setupDistributedDataStore(
739                     "testCreateChainedTransactionAfterClose", "test-1");
740
741             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
742
743             txChain.close();
744
745             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
746
747             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
748         }};
749     }
750
751     @Test
752     public void testChangeListenerRegistration() throws Exception{
753         new IntegrationTestKit(getSystem()) {{
754             DistributedDataStore dataStore =
755                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
756
757             testWriteTransaction(dataStore, TestModel.TEST_PATH,
758                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
759
760             MockDataChangeListener listener = new MockDataChangeListener(1);
761
762             ListenerRegistration<MockDataChangeListener>
763                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
764                             DataChangeScope.SUBTREE);
765
766             assertNotNull("registerChangeListener returned null", listenerReg);
767
768             // Wait for the initial notification
769
770             listener.waitForChangeEvents(TestModel.TEST_PATH);
771
772             listener.reset(2);
773
774             // Write 2 updates.
775
776             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
777                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
778
779             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
780                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
781             testWriteTransaction(dataStore, listPath,
782                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
783
784             // Wait for the 2 updates.
785
786             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
787
788             listenerReg.close();
789
790             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
791                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
792                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
793
794             listener.expectNoMoreChanges("Received unexpected change after close");
795
796             cleanup(dataStore);
797         }};
798     }
799
800     class IntegrationTestKit extends ShardTestKit {
801
802         IntegrationTestKit(ActorSystem actorSystem) {
803             super(actorSystem);
804         }
805
806         DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
807             return setupDistributedDataStore(typeName, true, shardNames);
808         }
809
810         DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
811                 String... shardNames) {
812             MockClusterWrapper cluster = new MockClusterWrapper();
813             Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
814             ShardStrategyFactory.setConfiguration(config);
815
816             datastoreContextBuilder.dataStoreType(typeName);
817
818             DatastoreContext datastoreContext = datastoreContextBuilder.build();
819
820             DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
821                     config, datastoreContext);
822
823             SchemaContext schemaContext = SchemaContextHelper.full();
824             dataStore.onGlobalContextUpdated(schemaContext);
825
826             if(waitUntilLeader) {
827                 for(String shardName: shardNames) {
828                     ActorRef shard = null;
829                     for(int i = 0; i < 20 * 5 && shard == null; i++) {
830                         Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
831                         Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
832                         if(shardReply.isPresent()) {
833                             shard = shardReply.get();
834                         }
835                     }
836
837                     assertNotNull("Shard was not created", shard);
838
839                     waitUntilLeader(shard);
840                 }
841             }
842
843             return dataStore;
844         }
845
846         void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
847                 NormalizedNode<?, ?> nodeToWrite) throws Exception {
848
849             // 1. Create a write-only Tx
850
851             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
852             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
853
854             // 2. Write some data
855
856             writeTx.write(nodePath, nodeToWrite);
857
858             // 3. Ready the Tx for commit
859
860             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
861
862             // 4. Commit the Tx
863
864             doCommit(cohort);
865
866             // 5. Verify the data in the store
867
868             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
869
870             Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
871             assertEquals("isPresent", true, optional.isPresent());
872             assertEquals("Data node", nodeToWrite, optional.get());
873         }
874
875         void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
876             Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
877             assertEquals("canCommit", true, canCommit);
878             cohort.preCommit().get(5, TimeUnit.SECONDS);
879             cohort.commit().get(5, TimeUnit.SECONDS);
880         }
881
882         void cleanup(DistributedDataStore dataStore) {
883             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
884         }
885
886         void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
887                 throws Exception {
888             try {
889                 callable.call();
890                 fail("Expected " + expType.getSimpleName());
891             } catch(Exception e) {
892                 assertEquals("Exception type", expType, e.getClass());
893             }
894         }
895
896         void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
897                 Class<? extends Exception> expType) throws Exception {
898             assertExceptionOnCall(new Callable<Void>() {
899                 @Override
900                 public Void call() throws Exception {
901                     txChain.newWriteOnlyTransaction();
902                     return null;
903                 }
904             }, expType);
905
906             assertExceptionOnCall(new Callable<Void>() {
907                 @Override
908                 public Void call() throws Exception {
909                     txChain.newReadWriteTransaction();
910                     return null;
911                 }
912             }, expType);
913
914             assertExceptionOnCall(new Callable<Void>() {
915                 @Override
916                 public Void call() throws Exception {
917                     txChain.newReadOnlyTransaction();
918                     return null;
919                 }
920             }, expType);
921         }
922     }
923
924 }