f3d93b896de082534f1ed4e8879117226b3952a6
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import akka.actor.ActorSystem;
6 import akka.actor.Address;
7 import akka.actor.AddressFromURIString;
8 import akka.cluster.Cluster;
9 import akka.testkit.JavaTestKit;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.Uninterruptibles;
13 import com.typesafe.config.ConfigFactory;
14 import java.io.IOException;
15 import java.math.BigInteger;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicReference;
22 import org.junit.AfterClass;
23 import org.junit.BeforeClass;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
26 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
27 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
28 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
29 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
30 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
31 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
32 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
33 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46
47 public class DistributedDataStoreIntegrationTest {
48
49     private static ActorSystem system;
50
51     private final DatastoreContext.Builder datastoreContextBuilder =
52             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
53
54     @BeforeClass
55     public static void setUpClass() throws IOException {
56         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
57         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
58         Cluster.get(system).join(member1Address);
59     }
60
61     @AfterClass
62     public static void tearDownClass() throws IOException {
63         JavaTestKit.shutdownActorSystem(system);
64         system = null;
65     }
66
67     protected ActorSystem getSystem() {
68         return system;
69     }
70
71     @Test
72     public void testWriteTransactionWithSingleShard() throws Exception{
73         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
74             DistributedDataStore dataStore =
75                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
76
77             testWriteTransaction(dataStore, TestModel.TEST_PATH,
78                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
79
80             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
81                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
82
83             cleanup(dataStore);
84         }};
85     }
86
87     @Test
88     public void testWriteTransactionWithMultipleShards() throws Exception{
89         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
90             DistributedDataStore dataStore =
91                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
92
93             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
94             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
95
96             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
97             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
98
99             doCommit(writeTx.ready());
100
101             writeTx = dataStore.newWriteOnlyTransaction();
102
103             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
104             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
105
106             doCommit(writeTx.ready());
107
108             writeTx = dataStore.newWriteOnlyTransaction();
109
110             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
111             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
112             writeTx.write(carPath, car);
113
114             MapEntryNode person = PeopleModel.newPersonEntry("jack");
115             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
116             writeTx.write(personPath, person);
117
118             doCommit(writeTx.ready());
119
120             // Verify the data in the store
121
122             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
123
124             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
125             assertEquals("isPresent", true, optional.isPresent());
126             assertEquals("Data node", car, optional.get());
127
128             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
129             assertEquals("isPresent", true, optional.isPresent());
130             assertEquals("Data node", person, optional.get());
131
132             cleanup(dataStore);
133         }};
134     }
135
136     @Test
137     public void testReadWriteTransactionWithSingleShard() throws Exception{
138         System.setProperty("shard.persistent", "true");
139         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
140             DistributedDataStore dataStore =
141                     setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
142
143             // 1. Create a read-write Tx
144
145             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
146             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
147
148             // 2. Write some data
149
150             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
151             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
152             readWriteTx.write(nodePath, nodeToWrite );
153
154             // 3. Read the data from Tx
155
156             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
157             assertEquals("exists", true, exists);
158
159             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
160             assertEquals("isPresent", true, optional.isPresent());
161             assertEquals("Data node", nodeToWrite, optional.get());
162
163             // 4. Ready the Tx for commit
164
165             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
166
167             // 5. Commit the Tx
168
169             doCommit(cohort);
170
171             // 6. Verify the data in the store
172
173             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
174
175             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
176             assertEquals("isPresent", true, optional.isPresent());
177             assertEquals("Data node", nodeToWrite, optional.get());
178
179             cleanup(dataStore);
180         }};
181     }
182
183     @Test
184     public void testReadWriteTransactionWithMultipleShards() throws Exception{
185         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
186             DistributedDataStore dataStore =
187                     setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
188
189             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
190             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
191
192             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
193             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
194
195             doCommit(readWriteTx.ready());
196
197             readWriteTx = dataStore.newReadWriteTransaction();
198
199             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
200             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
201
202             doCommit(readWriteTx.ready());
203
204             readWriteTx = dataStore.newReadWriteTransaction();
205
206             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
207             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
208             readWriteTx.write(carPath, car);
209
210             MapEntryNode person = PeopleModel.newPersonEntry("jack");
211             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
212             readWriteTx.write(personPath, person);
213
214             Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
215             assertEquals("exists", true, exists);
216
217             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
218             assertEquals("isPresent", true, optional.isPresent());
219             assertEquals("Data node", car, optional.get());
220
221             doCommit(readWriteTx.ready());
222
223             // Verify the data in the store
224
225             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
226
227             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
228             assertEquals("isPresent", true, optional.isPresent());
229             assertEquals("Data node", car, optional.get());
230
231             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
232             assertEquals("isPresent", true, optional.isPresent());
233             assertEquals("Data node", person, optional.get());
234
235             cleanup(dataStore);
236         }};
237     }
238
239     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
240             final boolean writeOnly) throws Exception {
241         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
242             String shardName = "test-1";
243
244             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
245             // initialized until we create and submit the write the Tx.
246             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
247             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
248             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
249
250             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
251
252             // Create the write Tx
253
254             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
255                     dataStore.newReadWriteTransaction();
256             assertNotNull("newReadWriteTransaction returned null", writeTx);
257
258             // Do some modification operations and ready the Tx on a separate thread.
259
260             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
261                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
262                             TestModel.ID_QNAME, 1).build();
263
264             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
265             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
266             final CountDownLatch txReady = new CountDownLatch(1);
267             Thread txThread = new Thread() {
268                 @Override
269                 public void run() {
270                     try {
271                         writeTx.write(TestModel.TEST_PATH,
272                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
273
274                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
275                                 TestModel.OUTER_LIST_QNAME).build());
276
277                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
278                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
279
280                         writeTx.delete(listEntryPath);
281
282                         txCohort.set(writeTx.ready());
283                     } catch(Exception e) {
284                         caughtEx.set(e);
285                         return;
286                     } finally {
287                         txReady.countDown();
288                     }
289                 }
290             };
291
292             txThread.start();
293
294             // Wait for the Tx operations to complete.
295
296             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
297             if(caughtEx.get() != null) {
298                 throw caughtEx.get();
299             }
300
301             assertEquals("Tx ready", true, done);
302
303             // At this point the Tx operations should be waiting for the shard to initialize so
304             // trigger the latch to let the shard recovery to continue.
305
306             blockRecoveryLatch.countDown();
307
308             // Wait for the Tx commit to complete.
309
310             doCommit(txCohort.get());
311
312             // Verify the data in the store
313
314             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
315
316             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
317                     get(5, TimeUnit.SECONDS);
318             assertEquals("isPresent", true, optional.isPresent());
319
320             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
321             assertEquals("isPresent", true, optional.isPresent());
322
323             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
324             assertEquals("isPresent", false, optional.isPresent());
325
326             cleanup(dataStore);
327         }};
328     }
329
330     @Test
331     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
332         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
333         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
334     }
335
336     @Test
337     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
338         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
339     }
340
341     @Test
342     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
343         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
344             String testName = "testTransactionReadsWithShardNotInitiallyReady";
345             String shardName = "test-1";
346
347             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
348             // initialized until we create the Tx.
349             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
350             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
351             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
352
353             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
354
355             // Create the read-write Tx
356
357             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
358             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
359
360             // Do some reads on the Tx on a separate thread.
361
362             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
363                     new AtomicReference<>();
364             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
365                     txReadFuture = new AtomicReference<>();
366             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
367             final CountDownLatch txReadsDone = new CountDownLatch(1);
368             Thread txThread = new Thread() {
369                 @Override
370                 public void run() {
371                     try {
372                         readWriteTx.write(TestModel.TEST_PATH,
373                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
374
375                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
376
377                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
378                     } catch(Exception e) {
379                         caughtEx.set(e);
380                         return;
381                     } finally {
382                         txReadsDone.countDown();
383                     }
384                 }
385             };
386
387             txThread.start();
388
389             // Wait for the Tx operations to complete.
390
391             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
392             if(caughtEx.get() != null) {
393                 throw caughtEx.get();
394             }
395
396             assertEquals("Tx reads done", true, done);
397
398             // At this point the Tx operations should be waiting for the shard to initialize so
399             // trigger the latch to let the shard recovery to continue.
400
401             blockRecoveryLatch.countDown();
402
403             // Wait for the reads to complete and verify.
404
405             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
406             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
407
408             readWriteTx.close();
409
410             cleanup(dataStore);
411         }};
412     }
413
414     @Test(expected=NotInitializedException.class)
415     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
416         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
417             String testName = "testTransactionCommitFailureWithShardNotInitialized";
418             String shardName = "test-1";
419
420             // Set the shard initialization timeout low for the test.
421
422             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
423
424             // Setup the InMemoryJournal to block shard recovery indefinitely.
425
426             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
427             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
428             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
429
430             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
431
432             // Create the write Tx
433
434             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
435             assertNotNull("newReadWriteTransaction returned null", writeTx);
436
437             // Do some modifications and ready the Tx on a separate thread.
438
439             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
440             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
441             final CountDownLatch txReady = new CountDownLatch(1);
442             Thread txThread = new Thread() {
443                 @Override
444                 public void run() {
445                     try {
446                         writeTx.write(TestModel.TEST_PATH,
447                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
448
449                         txCohort.set(writeTx.ready());
450                     } catch(Exception e) {
451                         caughtEx.set(e);
452                         return;
453                     } finally {
454                         txReady.countDown();
455                     }
456                 }
457             };
458
459             txThread.start();
460
461             // Wait for the Tx operations to complete.
462
463             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
464             if(caughtEx.get() != null) {
465                 throw caughtEx.get();
466             }
467
468             assertEquals("Tx ready", true, done);
469
470             // Wait for the commit to complete. Since the shard never initialized, the Tx should
471             // have timed out and throw an appropriate exception cause.
472
473             try {
474                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
475             } catch(ExecutionException e) {
476                 throw e.getCause();
477             } finally {
478                 blockRecoveryLatch.countDown();
479                 cleanup(dataStore);
480             }
481         }};
482     }
483
484     @Test(expected=NotInitializedException.class)
485     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
486         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
487             String testName = "testTransactionReadFailureWithShardNotInitialized";
488             String shardName = "test-1";
489
490             // Set the shard initialization timeout low for the test.
491
492             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
493
494             // Setup the InMemoryJournal to block shard recovery indefinitely.
495
496             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
497             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
498             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
499
500             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
501
502             // Create the read-write Tx
503
504             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
505             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
506
507             // Do a read on the Tx on a separate thread.
508
509             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
510                     txReadFuture = new AtomicReference<>();
511             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
512             final CountDownLatch txReadDone = new CountDownLatch(1);
513             Thread txThread = new Thread() {
514                 @Override
515                 public void run() {
516                     try {
517                         readWriteTx.write(TestModel.TEST_PATH,
518                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
519
520                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
521
522                         readWriteTx.close();
523                     } catch(Exception e) {
524                         caughtEx.set(e);
525                         return;
526                     } finally {
527                         txReadDone.countDown();
528                     }
529                 }
530             };
531
532             txThread.start();
533
534             // Wait for the Tx operations to complete.
535
536             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
537             if(caughtEx.get() != null) {
538                 throw caughtEx.get();
539             }
540
541             assertEquals("Tx read done", true, done);
542
543             // Wait for the read to complete. Since the shard never initialized, the Tx should
544             // have timed out and throw an appropriate exception cause.
545
546             try {
547                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
548             } catch(ReadFailedException e) {
549                 throw e.getCause();
550             } finally {
551                 blockRecoveryLatch.countDown();
552                 cleanup(dataStore);
553             }
554         }};
555     }
556
557     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
558         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
559             String testName = "testTransactionCommitFailureWithNoShardLeader";
560             String shardName = "default";
561
562             // We don't want the shard to become the leader so prevent shard election from completing
563             // by setting the election timeout, which is based on the heartbeat interval, really high.
564
565             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
566             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
567
568             // Set the leader election timeout low for the test.
569
570             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
571
572             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
573
574             // Create the write Tx.
575
576             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
577                 dataStore.newReadWriteTransaction();
578             assertNotNull("newReadWriteTransaction returned null", writeTx);
579
580             // Do some modifications and ready the Tx on a separate thread.
581
582             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
583             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
584             final CountDownLatch txReady = new CountDownLatch(1);
585             Thread txThread = new Thread() {
586                 @Override
587                 public void run() {
588                     try {
589                         writeTx.write(TestModel.JUNK_PATH,
590                                 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
591
592                         txCohort.set(writeTx.ready());
593                     } catch(Exception e) {
594                         caughtEx.set(e);
595                         return;
596                     } finally {
597                         txReady.countDown();
598                     }
599                 }
600             };
601
602             txThread.start();
603
604             // Wait for the Tx operations to complete.
605
606             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
607             if(caughtEx.get() != null) {
608                 throw caughtEx.get();
609             }
610
611             assertEquals("Tx ready", true, done);
612
613             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
614             // should have timed out and throw an appropriate exception cause.
615
616             try {
617                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
618             } catch(ExecutionException e) {
619                 throw e.getCause();
620             } finally {
621                 cleanup(dataStore);
622             }
623         }};
624     }
625
626     @Test(expected=NoShardLeaderException.class)
627     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
628         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
629         testTransactionCommitFailureWithNoShardLeader(true);
630     }
631
632     @Test(expected=NoShardLeaderException.class)
633     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
634         testTransactionCommitFailureWithNoShardLeader(false);
635     }
636
637     @Test
638     public void testTransactionAbort() throws Exception{
639         System.setProperty("shard.persistent", "true");
640         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
641             DistributedDataStore dataStore =
642                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
643
644             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
645             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
646
647             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
648
649             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
650
651             cohort.canCommit().get(5, TimeUnit.SECONDS);
652
653             cohort.abort().get(5, TimeUnit.SECONDS);
654
655             testWriteTransaction(dataStore, TestModel.TEST_PATH,
656                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
657
658             cleanup(dataStore);
659         }};
660     }
661
662     @Test
663     public void testTransactionChainWithSingleShard() throws Exception{
664         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
665             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
666
667             // 1. Create a Tx chain and write-only Tx
668
669             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
670
671             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
672             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
673
674             // 2. Write some data
675
676             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
677             writeTx.write(TestModel.TEST_PATH, testNode);
678
679             // 3. Ready the Tx for commit
680
681             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
682
683             // 4. Commit the Tx on another thread that first waits for the second read Tx.
684
685             final CountDownLatch continueCommit1 = new CountDownLatch(1);
686             final CountDownLatch commit1Done = new CountDownLatch(1);
687             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
688             new Thread() {
689                 @Override
690                 public void run() {
691                     try {
692                         continueCommit1.await();
693                         doCommit(cohort1);
694                     } catch (Exception e) {
695                         commit1Error.set(e);
696                     } finally {
697                         commit1Done.countDown();
698                     }
699                 }
700             }.start();
701
702             // 5. Create a new read Tx from the chain to read and verify the data from the first
703             // Tx is visible after being readied.
704
705             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
706             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
707             assertEquals("isPresent", true, optional.isPresent());
708             assertEquals("Data node", testNode, optional.get());
709
710             // 6. Create a new RW Tx from the chain, write more data, and ready it
711
712             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
713             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
714             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
715
716             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
717
718             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
719             // verify it is visible.
720
721             readTx = txChain.newReadWriteTransaction();
722             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
723             assertEquals("isPresent", true, optional.isPresent());
724             assertEquals("Data node", outerNode, optional.get());
725
726             // 8. Wait for the 2 commits to complete and close the chain.
727
728             continueCommit1.countDown();
729             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
730
731             if(commit1Error.get() != null) {
732                 throw commit1Error.get();
733             }
734
735             doCommit(cohort2);
736
737             txChain.close();
738
739             // 9. Create a new read Tx from the data store and verify committed data.
740
741             readTx = dataStore.newReadOnlyTransaction();
742             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
743             assertEquals("isPresent", true, optional.isPresent());
744             assertEquals("Data node", outerNode, optional.get());
745
746             cleanup(dataStore);
747         }};
748     }
749
750     @Test
751     public void testTransactionChainWithMultipleShards() throws Exception{
752         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
753             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
754                     "cars-1", "people-1");
755
756             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
757
758             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
759             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
760
761             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
762             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
763
764             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
765             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
766
767             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
768
769             DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
770
771             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
772             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
773             readWriteTx.write(carPath, car);
774
775             MapEntryNode person = PeopleModel.newPersonEntry("jack");
776             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
777             readWriteTx.merge(personPath, person);
778
779             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
780             assertEquals("isPresent", true, optional.isPresent());
781             assertEquals("Data node", car, optional.get());
782
783             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
784             assertEquals("isPresent", true, optional.isPresent());
785             assertEquals("Data node", person, optional.get());
786
787             DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
788
789             writeTx = txChain.newWriteOnlyTransaction();
790
791             //writeTx.delete(personPath);
792
793             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
794
795             doCommit(cohort1);
796             doCommit(cohort2);
797             doCommit(cohort3);
798
799             txChain.close();
800
801             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
802
803             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
804             assertEquals("isPresent", true, optional.isPresent());
805             assertEquals("Data node", car, optional.get());
806
807             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
808             //assertEquals("isPresent", false, optional.isPresent());
809             assertEquals("isPresent", true, optional.isPresent());
810
811             cleanup(dataStore);
812         }};
813     }
814
815     @Test
816     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
817         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
818             DistributedDataStore dataStore = setupDistributedDataStore(
819                     "testCreateChainedTransactionsInQuickSuccession", "test-1");
820
821             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
822
823             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
824
825             int nTxs = 20;
826             List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
827             for(int i = 0; i < nTxs; i++) {
828                 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
829
830                 rwTx.merge(TestModel.TEST_PATH, testNode);
831
832                 cohorts.add(rwTx.ready());
833
834             }
835
836             for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
837                 doCommit(cohort);
838             }
839
840             txChain.close();
841
842             cleanup(dataStore);
843         }};
844     }
845
846     @Test
847     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
848         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
849             DistributedDataStore dataStore = setupDistributedDataStore(
850                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
851
852             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
853
854             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
855
856             rwTx1.ready();
857
858             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
859
860             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
861             assertEquals("isPresent", false, optional.isPresent());
862
863             txChain.close();
864
865             cleanup(dataStore);
866         }};
867     }
868
869     @Test
870     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
871         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
872             DistributedDataStore dataStore = setupDistributedDataStore(
873                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
874
875             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
876
877             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
878             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
879
880             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
881
882             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
883             // readied.
884
885             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
886         }};
887     }
888
889     @Test
890     public void testCreateChainedTransactionAfterClose() throws Throwable {
891         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
892             DistributedDataStore dataStore = setupDistributedDataStore(
893                     "testCreateChainedTransactionAfterClose", "test-1");
894
895             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
896
897             txChain.close();
898
899             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
900
901             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
902         }};
903     }
904
905     @Test
906     public void testChangeListenerRegistration() throws Exception{
907         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
908             DistributedDataStore dataStore =
909                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
910
911             testWriteTransaction(dataStore, TestModel.TEST_PATH,
912                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
913
914             MockDataChangeListener listener = new MockDataChangeListener(1);
915
916             ListenerRegistration<MockDataChangeListener>
917                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
918                             DataChangeScope.SUBTREE);
919
920             assertNotNull("registerChangeListener returned null", listenerReg);
921
922             // Wait for the initial notification
923
924             listener.waitForChangeEvents(TestModel.TEST_PATH);
925
926             listener.reset(2);
927
928             // Write 2 updates.
929
930             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
931                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
932
933             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
934                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
935             testWriteTransaction(dataStore, listPath,
936                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
937
938             // Wait for the 2 updates.
939
940             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
941
942             listenerReg.close();
943
944             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
945                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
946                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
947
948             listener.expectNoMoreChanges("Received unexpected change after close");
949
950             cleanup(dataStore);
951         }};
952     }
953 }