76ae3c71566bdce5663918e05a7f1d6cf54e352b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import akka.actor.ActorSystem;
6 import akka.actor.Address;
7 import akka.actor.AddressFromURIString;
8 import akka.cluster.Cluster;
9 import akka.testkit.JavaTestKit;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.Uninterruptibles;
14 import com.typesafe.config.ConfigFactory;
15 import java.io.IOException;
16 import java.math.BigInteger;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.AfterClass;
24 import org.junit.BeforeClass;
25 import org.junit.Test;
26 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
27 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
28 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
29 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
30 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
31 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
32 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
33 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
34 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47
48 public class DistributedDataStoreIntegrationTest {
49
50     private static ActorSystem system;
51
52     private final DatastoreContext.Builder datastoreContextBuilder =
53             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
54
55     @BeforeClass
56     public static void setUpClass() throws IOException {
57         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
58         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
59         Cluster.get(system).join(member1Address);
60     }
61
62     @AfterClass
63     public static void tearDownClass() throws IOException {
64         JavaTestKit.shutdownActorSystem(system);
65         system = null;
66     }
67
68     protected ActorSystem getSystem() {
69         return system;
70     }
71
72     @Test
73     public void testWriteTransactionWithSingleShard() throws Exception{
74         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
75             DistributedDataStore dataStore =
76                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
77
78             testWriteTransaction(dataStore, TestModel.TEST_PATH,
79                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
80
81             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
82                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
83
84             cleanup(dataStore);
85         }};
86     }
87
88     @Test
89     public void testWriteTransactionWithMultipleShards() throws Exception{
90         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
91             DistributedDataStore dataStore =
92                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
93
94             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
95             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
96
97             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
98             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
99
100             doCommit(writeTx.ready());
101
102             writeTx = dataStore.newWriteOnlyTransaction();
103
104             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
105             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
106
107             doCommit(writeTx.ready());
108
109             writeTx = dataStore.newWriteOnlyTransaction();
110
111             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
112             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
113             writeTx.write(carPath, car);
114
115             MapEntryNode person = PeopleModel.newPersonEntry("jack");
116             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
117             writeTx.write(personPath, person);
118
119             doCommit(writeTx.ready());
120
121             // Verify the data in the store
122
123             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
124
125             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
126             assertEquals("isPresent", true, optional.isPresent());
127             assertEquals("Data node", car, optional.get());
128
129             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
130             assertEquals("isPresent", true, optional.isPresent());
131             assertEquals("Data node", person, optional.get());
132
133             cleanup(dataStore);
134         }};
135     }
136
137     @Test
138     public void testReadWriteTransactionWithSingleShard() throws Exception{
139         System.setProperty("shard.persistent", "true");
140         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
141             DistributedDataStore dataStore =
142                     setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
143
144             // 1. Create a read-write Tx
145
146             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
147             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
148
149             // 2. Write some data
150
151             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
152             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
153             readWriteTx.write(nodePath, nodeToWrite );
154
155             // 3. Read the data from Tx
156
157             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
158             assertEquals("exists", true, exists);
159
160             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
161             assertEquals("isPresent", true, optional.isPresent());
162             assertEquals("Data node", nodeToWrite, optional.get());
163
164             // 4. Ready the Tx for commit
165
166             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
167
168             // 5. Commit the Tx
169
170             doCommit(cohort);
171
172             // 6. Verify the data in the store
173
174             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
175
176             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
177             assertEquals("isPresent", true, optional.isPresent());
178             assertEquals("Data node", nodeToWrite, optional.get());
179
180             cleanup(dataStore);
181         }};
182     }
183
184     @Test
185     public void testReadWriteTransactionWithMultipleShards() throws Exception{
186         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
187             DistributedDataStore dataStore =
188                     setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
189
190             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
191             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
192
193             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
194             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
195
196             doCommit(readWriteTx.ready());
197
198             readWriteTx = dataStore.newReadWriteTransaction();
199
200             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
201             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
202
203             doCommit(readWriteTx.ready());
204
205             readWriteTx = dataStore.newReadWriteTransaction();
206
207             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
208             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
209             readWriteTx.write(carPath, car);
210
211             MapEntryNode person = PeopleModel.newPersonEntry("jack");
212             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
213             readWriteTx.write(personPath, person);
214
215             Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
216             assertEquals("exists", true, exists);
217
218             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
219             assertEquals("isPresent", true, optional.isPresent());
220             assertEquals("Data node", car, optional.get());
221
222             doCommit(readWriteTx.ready());
223
224             // Verify the data in the store
225
226             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
227
228             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
229             assertEquals("isPresent", true, optional.isPresent());
230             assertEquals("Data node", car, optional.get());
231
232             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
233             assertEquals("isPresent", true, optional.isPresent());
234             assertEquals("Data node", person, optional.get());
235
236             cleanup(dataStore);
237         }};
238     }
239
240     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
241             final boolean writeOnly) throws Exception {
242         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
243             String shardName = "test-1";
244
245             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
246             // initialized until we create and submit the write the Tx.
247             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
248             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
249             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
250
251             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
252
253             // Create the write Tx
254
255             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
256                     dataStore.newReadWriteTransaction();
257             assertNotNull("newReadWriteTransaction returned null", writeTx);
258
259             // Do some modification operations and ready the Tx on a separate thread.
260
261             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
262                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
263                             TestModel.ID_QNAME, 1).build();
264
265             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
266             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
267             final CountDownLatch txReady = new CountDownLatch(1);
268             Thread txThread = new Thread() {
269                 @Override
270                 public void run() {
271                     try {
272                         writeTx.write(TestModel.TEST_PATH,
273                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
274
275                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
276                                 TestModel.OUTER_LIST_QNAME).build());
277
278                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
279                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
280
281                         writeTx.delete(listEntryPath);
282
283                         txCohort.set(writeTx.ready());
284                     } catch(Exception e) {
285                         caughtEx.set(e);
286                         return;
287                     } finally {
288                         txReady.countDown();
289                     }
290                 }
291             };
292
293             txThread.start();
294
295             // Wait for the Tx operations to complete.
296
297             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
298             if(caughtEx.get() != null) {
299                 throw caughtEx.get();
300             }
301
302             assertEquals("Tx ready", true, done);
303
304             // At this point the Tx operations should be waiting for the shard to initialize so
305             // trigger the latch to let the shard recovery to continue.
306
307             blockRecoveryLatch.countDown();
308
309             // Wait for the Tx commit to complete.
310
311             doCommit(txCohort.get());
312
313             // Verify the data in the store
314
315             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
316
317             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
318                     get(5, TimeUnit.SECONDS);
319             assertEquals("isPresent", true, optional.isPresent());
320
321             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
322             assertEquals("isPresent", true, optional.isPresent());
323
324             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
325             assertEquals("isPresent", false, optional.isPresent());
326
327             cleanup(dataStore);
328         }};
329     }
330
331     @Test
332     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
333         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
334         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
335     }
336
337     @Test
338     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
339         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
340     }
341
342     @Test
343     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
344         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
345             String testName = "testTransactionReadsWithShardNotInitiallyReady";
346             String shardName = "test-1";
347
348             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
349             // initialized until we create the Tx.
350             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
351             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
352             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
353
354             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
355
356             // Create the read-write Tx
357
358             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
359             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
360
361             // Do some reads on the Tx on a separate thread.
362
363             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
364                     new AtomicReference<>();
365             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
366                     txReadFuture = new AtomicReference<>();
367             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
368             final CountDownLatch txReadsDone = new CountDownLatch(1);
369             Thread txThread = new Thread() {
370                 @Override
371                 public void run() {
372                     try {
373                         readWriteTx.write(TestModel.TEST_PATH,
374                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
375
376                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
377
378                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
379                     } catch(Exception e) {
380                         caughtEx.set(e);
381                         return;
382                     } finally {
383                         txReadsDone.countDown();
384                     }
385                 }
386             };
387
388             txThread.start();
389
390             // Wait for the Tx operations to complete.
391
392             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
393             if(caughtEx.get() != null) {
394                 throw caughtEx.get();
395             }
396
397             assertEquals("Tx reads done", true, done);
398
399             // At this point the Tx operations should be waiting for the shard to initialize so
400             // trigger the latch to let the shard recovery to continue.
401
402             blockRecoveryLatch.countDown();
403
404             // Wait for the reads to complete and verify.
405
406             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
407             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
408
409             readWriteTx.close();
410
411             cleanup(dataStore);
412         }};
413     }
414
415     @Test(expected=NotInitializedException.class)
416     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
417         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
418             String testName = "testTransactionCommitFailureWithShardNotInitialized";
419             String shardName = "test-1";
420
421             // Set the shard initialization timeout low for the test.
422
423             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
424
425             // Setup the InMemoryJournal to block shard recovery indefinitely.
426
427             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
428             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
429             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
430
431             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
432
433             // Create the write Tx
434
435             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
436             assertNotNull("newReadWriteTransaction returned null", writeTx);
437
438             // Do some modifications and ready the Tx on a separate thread.
439
440             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
441             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
442             final CountDownLatch txReady = new CountDownLatch(1);
443             Thread txThread = new Thread() {
444                 @Override
445                 public void run() {
446                     try {
447                         writeTx.write(TestModel.TEST_PATH,
448                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
449
450                         txCohort.set(writeTx.ready());
451                     } catch(Exception e) {
452                         caughtEx.set(e);
453                         return;
454                     } finally {
455                         txReady.countDown();
456                     }
457                 }
458             };
459
460             txThread.start();
461
462             // Wait for the Tx operations to complete.
463
464             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
465             if(caughtEx.get() != null) {
466                 throw caughtEx.get();
467             }
468
469             assertEquals("Tx ready", true, done);
470
471             // Wait for the commit to complete. Since the shard never initialized, the Tx should
472             // have timed out and throw an appropriate exception cause.
473
474             try {
475                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
476             } catch(ExecutionException e) {
477                 throw e.getCause();
478             } finally {
479                 blockRecoveryLatch.countDown();
480                 cleanup(dataStore);
481             }
482         }};
483     }
484
485     @Test(expected=NotInitializedException.class)
486     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
487         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
488             String testName = "testTransactionReadFailureWithShardNotInitialized";
489             String shardName = "test-1";
490
491             // Set the shard initialization timeout low for the test.
492
493             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
494
495             // Setup the InMemoryJournal to block shard recovery indefinitely.
496
497             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
498             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
499             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
500
501             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
502
503             // Create the read-write Tx
504
505             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
506             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
507
508             // Do a read on the Tx on a separate thread.
509
510             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
511                     txReadFuture = new AtomicReference<>();
512             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
513             final CountDownLatch txReadDone = new CountDownLatch(1);
514             Thread txThread = new Thread() {
515                 @Override
516                 public void run() {
517                     try {
518                         readWriteTx.write(TestModel.TEST_PATH,
519                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
520
521                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
522
523                         readWriteTx.close();
524                     } catch(Exception e) {
525                         caughtEx.set(e);
526                         return;
527                     } finally {
528                         txReadDone.countDown();
529                     }
530                 }
531             };
532
533             txThread.start();
534
535             // Wait for the Tx operations to complete.
536
537             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
538             if(caughtEx.get() != null) {
539                 throw caughtEx.get();
540             }
541
542             assertEquals("Tx read done", true, done);
543
544             // Wait for the read to complete. Since the shard never initialized, the Tx should
545             // have timed out and throw an appropriate exception cause.
546
547             try {
548                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
549             } catch(ReadFailedException e) {
550                 throw e.getCause();
551             } finally {
552                 blockRecoveryLatch.countDown();
553                 cleanup(dataStore);
554             }
555         }};
556     }
557
558     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
559         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
560             String testName = "testTransactionCommitFailureWithNoShardLeader";
561             String shardName = "default";
562
563             // We don't want the shard to become the leader so prevent shard election from completing
564             // by setting the election timeout, which is based on the heartbeat interval, really high.
565
566             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
567             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
568
569             // Set the leader election timeout low for the test.
570
571             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
572
573             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
574
575             // Create the write Tx.
576
577             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
578                 dataStore.newReadWriteTransaction();
579             assertNotNull("newReadWriteTransaction returned null", writeTx);
580
581             // Do some modifications and ready the Tx on a separate thread.
582
583             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
584             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
585             final CountDownLatch txReady = new CountDownLatch(1);
586             Thread txThread = new Thread() {
587                 @Override
588                 public void run() {
589                     try {
590                         writeTx.write(TestModel.JUNK_PATH,
591                                 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
592
593                         txCohort.set(writeTx.ready());
594                     } catch(Exception e) {
595                         caughtEx.set(e);
596                         return;
597                     } finally {
598                         txReady.countDown();
599                     }
600                 }
601             };
602
603             txThread.start();
604
605             // Wait for the Tx operations to complete.
606
607             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
608             if(caughtEx.get() != null) {
609                 throw caughtEx.get();
610             }
611
612             assertEquals("Tx ready", true, done);
613
614             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
615             // should have timed out and throw an appropriate exception cause.
616
617             try {
618                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
619             } catch(ExecutionException e) {
620                 throw e.getCause();
621             } finally {
622                 cleanup(dataStore);
623             }
624         }};
625     }
626
627     @Test(expected=NoShardLeaderException.class)
628     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
629         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
630         testTransactionCommitFailureWithNoShardLeader(true);
631     }
632
633     @Test(expected=NoShardLeaderException.class)
634     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
635         testTransactionCommitFailureWithNoShardLeader(false);
636     }
637
638     @Test
639     public void testTransactionAbort() throws Exception{
640         System.setProperty("shard.persistent", "true");
641         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
642             DistributedDataStore dataStore =
643                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
644
645             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
646             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
647
648             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
649
650             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
651
652             cohort.canCommit().get(5, TimeUnit.SECONDS);
653
654             cohort.abort().get(5, TimeUnit.SECONDS);
655
656             testWriteTransaction(dataStore, TestModel.TEST_PATH,
657                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
658
659             cleanup(dataStore);
660         }};
661     }
662
663     @Test
664     public void testTransactionChainWithSingleShard() throws Exception{
665         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
666             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
667
668             // 1. Create a Tx chain and write-only Tx
669
670             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
671
672             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
673             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
674
675             // 2. Write some data
676
677             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
678             writeTx.write(TestModel.TEST_PATH, testNode);
679
680             // 3. Ready the Tx for commit
681
682             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
683
684             // 4. Commit the Tx on another thread that first waits for the second read Tx.
685
686             final CountDownLatch continueCommit1 = new CountDownLatch(1);
687             final CountDownLatch commit1Done = new CountDownLatch(1);
688             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
689             new Thread() {
690                 @Override
691                 public void run() {
692                     try {
693                         continueCommit1.await();
694                         doCommit(cohort1);
695                     } catch (Exception e) {
696                         commit1Error.set(e);
697                     } finally {
698                         commit1Done.countDown();
699                     }
700                 }
701             }.start();
702
703             // 5. Create a new read Tx from the chain to read and verify the data from the first
704             // Tx is visible after being readied.
705
706             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
707             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
708             assertEquals("isPresent", true, optional.isPresent());
709             assertEquals("Data node", testNode, optional.get());
710
711             // 6. Create a new RW Tx from the chain, write more data, and ready it
712
713             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
714             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
715             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
716
717             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
718
719             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
720             // verify it is visible.
721
722             readTx = txChain.newReadWriteTransaction();
723             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
724             assertEquals("isPresent", true, optional.isPresent());
725             assertEquals("Data node", outerNode, optional.get());
726
727             // 8. Wait for the 2 commits to complete and close the chain.
728
729             continueCommit1.countDown();
730             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
731
732             if(commit1Error.get() != null) {
733                 throw commit1Error.get();
734             }
735
736             doCommit(cohort2);
737
738             txChain.close();
739
740             // 9. Create a new read Tx from the data store and verify committed data.
741
742             readTx = dataStore.newReadOnlyTransaction();
743             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
744             assertEquals("isPresent", true, optional.isPresent());
745             assertEquals("Data node", outerNode, optional.get());
746
747             cleanup(dataStore);
748         }};
749     }
750
751     @Test
752     public void testTransactionChainWithMultipleShards() throws Exception{
753         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
754             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
755                     "cars-1", "people-1");
756
757             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
758
759             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
760             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
761
762             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
763             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
764
765             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
766             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
767
768             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
769
770             DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
771
772             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
773             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
774             readWriteTx.write(carPath, car);
775
776             MapEntryNode person = PeopleModel.newPersonEntry("jack");
777             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
778             readWriteTx.merge(personPath, person);
779
780             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
781             assertEquals("isPresent", true, optional.isPresent());
782             assertEquals("Data node", car, optional.get());
783
784             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
785             assertEquals("isPresent", true, optional.isPresent());
786             assertEquals("Data node", person, optional.get());
787
788             DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
789
790             writeTx = txChain.newWriteOnlyTransaction();
791
792             writeTx.delete(carPath);
793
794             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
795
796             ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
797             ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
798
799             doCommit(canCommit1, cohort1);
800             doCommit(canCommit2, cohort2);
801             doCommit(cohort3);
802
803             txChain.close();
804
805             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
806
807             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
808             assertEquals("isPresent", false, optional.isPresent());
809
810             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
811             assertEquals("isPresent", true, optional.isPresent());
812             assertEquals("Data node", person, optional.get());
813
814             cleanup(dataStore);
815         }};
816     }
817
818     @Test
819     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
820         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
821             DistributedDataStore dataStore = setupDistributedDataStore(
822                     "testCreateChainedTransactionsInQuickSuccession", "test-1");
823
824             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
825
826             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
827
828             int nTxs = 20;
829             List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
830             for(int i = 0; i < nTxs; i++) {
831                 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
832
833                 rwTx.merge(TestModel.TEST_PATH, testNode);
834
835                 cohorts.add(rwTx.ready());
836
837             }
838
839             for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
840                 doCommit(cohort);
841             }
842
843             txChain.close();
844
845             cleanup(dataStore);
846         }};
847     }
848
849     @Test
850     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
851         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
852             DistributedDataStore dataStore = setupDistributedDataStore(
853                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
854
855             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
856
857             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
858
859             rwTx1.ready();
860
861             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
862
863             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
864             assertEquals("isPresent", false, optional.isPresent());
865
866             txChain.close();
867
868             cleanup(dataStore);
869         }};
870     }
871
872     @Test
873     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
874         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
875             DistributedDataStore dataStore = setupDistributedDataStore(
876                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
877
878             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
879
880             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
881             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
882
883             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
884
885             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
886             // readied.
887
888             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
889         }};
890     }
891
892     @Test
893     public void testCreateChainedTransactionAfterClose() throws Throwable {
894         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
895             DistributedDataStore dataStore = setupDistributedDataStore(
896                     "testCreateChainedTransactionAfterClose", "test-1");
897
898             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
899
900             txChain.close();
901
902             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
903
904             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
905         }};
906     }
907
908     @Test
909     public void testChangeListenerRegistration() throws Exception{
910         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
911             DistributedDataStore dataStore =
912                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
913
914             testWriteTransaction(dataStore, TestModel.TEST_PATH,
915                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
916
917             MockDataChangeListener listener = new MockDataChangeListener(1);
918
919             ListenerRegistration<MockDataChangeListener>
920                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
921                             DataChangeScope.SUBTREE);
922
923             assertNotNull("registerChangeListener returned null", listenerReg);
924
925             // Wait for the initial notification
926
927             listener.waitForChangeEvents(TestModel.TEST_PATH);
928
929             listener.reset(2);
930
931             // Write 2 updates.
932
933             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
934                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
935
936             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
937                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
938             testWriteTransaction(dataStore, listPath,
939                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
940
941             // Wait for the 2 updates.
942
943             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
944
945             listenerReg.close();
946
947             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
948                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
949                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
950
951             listener.expectNoMoreChanges("Received unexpected change after close");
952
953             cleanup(dataStore);
954         }};
955     }
956 }