Bug 3195: Cleanup on error paths and error handling
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.fail;
6 import static org.mockito.Matchers.any;
7 import static org.mockito.Matchers.eq;
8 import static org.mockito.Mockito.timeout;
9 import static org.mockito.Mockito.verify;
10 import akka.actor.ActorSystem;
11 import akka.actor.Address;
12 import akka.actor.AddressFromURIString;
13 import akka.cluster.Cluster;
14 import akka.testkit.JavaTestKit;
15 import com.google.common.base.Optional;
16 import com.google.common.collect.ImmutableMap;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import com.typesafe.config.ConfigFactory;
22 import java.io.IOException;
23 import java.math.BigInteger;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.junit.AfterClass;
32 import org.junit.BeforeClass;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
37 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
38 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
39 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
40 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
43 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
44 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
45 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
46 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
47 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
48 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
49 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
50 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
54 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
57 import org.opendaylight.yangtools.concepts.ListenerRegistration;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
59 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
63 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
64 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
65
66 public class DistributedDataStoreIntegrationTest {
67
68     private static ActorSystem system;
69
70     private final DatastoreContext.Builder datastoreContextBuilder =
71             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
72
73     @BeforeClass
74     public static void setUpClass() throws IOException {
75         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
76         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
77         Cluster.get(system).join(member1Address);
78     }
79
80     @AfterClass
81     public static void tearDownClass() throws IOException {
82         JavaTestKit.shutdownActorSystem(system);
83         system = null;
84     }
85
86     protected ActorSystem getSystem() {
87         return system;
88     }
89
90     @Test
91     public void testWriteTransactionWithSingleShard() throws Exception{
92         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
93             DistributedDataStore dataStore =
94                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
95
96             testWriteTransaction(dataStore, TestModel.TEST_PATH,
97                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
98
99             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
100                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
101
102             cleanup(dataStore);
103         }};
104     }
105
106     @Test
107     public void testWriteTransactionWithMultipleShards() throws Exception{
108         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
109             DistributedDataStore dataStore =
110                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
111
112             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
113             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
114
115             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
116             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
117
118             doCommit(writeTx.ready());
119
120             writeTx = dataStore.newWriteOnlyTransaction();
121
122             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
123             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
124
125             doCommit(writeTx.ready());
126
127             writeTx = dataStore.newWriteOnlyTransaction();
128
129             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
130             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
131             writeTx.write(carPath, car);
132
133             MapEntryNode person = PeopleModel.newPersonEntry("jack");
134             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
135             writeTx.write(personPath, person);
136
137             doCommit(writeTx.ready());
138
139             // Verify the data in the store
140
141             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
142
143             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
144             assertEquals("isPresent", true, optional.isPresent());
145             assertEquals("Data node", car, optional.get());
146
147             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
148             assertEquals("isPresent", true, optional.isPresent());
149             assertEquals("Data node", person, optional.get());
150
151             cleanup(dataStore);
152         }};
153     }
154
155     @Test
156     public void testReadWriteTransactionWithSingleShard() throws Exception{
157         System.setProperty("shard.persistent", "true");
158         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
159             DistributedDataStore dataStore =
160                     setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
161
162             // 1. Create a read-write Tx
163
164             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
165             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
166
167             // 2. Write some data
168
169             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
170             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
171             readWriteTx.write(nodePath, nodeToWrite );
172
173             // 3. Read the data from Tx
174
175             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
176             assertEquals("exists", true, exists);
177
178             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
179             assertEquals("isPresent", true, optional.isPresent());
180             assertEquals("Data node", nodeToWrite, optional.get());
181
182             // 4. Ready the Tx for commit
183
184             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
185
186             // 5. Commit the Tx
187
188             doCommit(cohort);
189
190             // 6. Verify the data in the store
191
192             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
193
194             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
195             assertEquals("isPresent", true, optional.isPresent());
196             assertEquals("Data node", nodeToWrite, optional.get());
197
198             cleanup(dataStore);
199         }};
200     }
201
202     @Test
203     public void testReadWriteTransactionWithMultipleShards() throws Exception{
204         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
205             DistributedDataStore dataStore =
206                     setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
207
208             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
209             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
210
211             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
212             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
213
214             doCommit(readWriteTx.ready());
215
216             readWriteTx = dataStore.newReadWriteTransaction();
217
218             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
219             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
220
221             doCommit(readWriteTx.ready());
222
223             readWriteTx = dataStore.newReadWriteTransaction();
224
225             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
226             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
227             readWriteTx.write(carPath, car);
228
229             MapEntryNode person = PeopleModel.newPersonEntry("jack");
230             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
231             readWriteTx.write(personPath, person);
232
233             Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
234             assertEquals("exists", true, exists);
235
236             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
237             assertEquals("isPresent", true, optional.isPresent());
238             assertEquals("Data node", car, optional.get());
239
240             doCommit(readWriteTx.ready());
241
242             // Verify the data in the store
243
244             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
245
246             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
247             assertEquals("isPresent", true, optional.isPresent());
248             assertEquals("Data node", car, optional.get());
249
250             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
251             assertEquals("isPresent", true, optional.isPresent());
252             assertEquals("Data node", person, optional.get());
253
254             cleanup(dataStore);
255         }};
256     }
257
258     @Test
259     public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
260         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
261             DistributedDataStore dataStore = setupDistributedDataStore(
262                     "testSingleTransactionsWritesInQuickSuccession", "cars-1");
263
264             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
265
266             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
267             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
268             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
269             doCommit(writeTx.ready());
270
271             writeTx = txChain.newWriteOnlyTransaction();
272
273             int nCars = 5;
274             for(int i = 0; i < nCars; i++) {
275                 writeTx.write(CarsModel.newCarPath("car" + i),
276                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
277             }
278
279             doCommit(writeTx.ready());
280
281             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
282                     CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
283             assertEquals("isPresent", true, optional.isPresent());
284             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
285
286             cleanup(dataStore);
287         }};
288     }
289
290     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
291             final boolean writeOnly) throws Exception {
292         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
293             String shardName = "test-1";
294
295             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
296             // initialized until we create and submit the write the Tx.
297             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
298             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
299             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
300
301             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
302
303             // Create the write Tx
304
305             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
306                     dataStore.newReadWriteTransaction();
307             assertNotNull("newReadWriteTransaction returned null", writeTx);
308
309             // Do some modification operations and ready the Tx on a separate thread.
310
311             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
312                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
313                             TestModel.ID_QNAME, 1).build();
314
315             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
316             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
317             final CountDownLatch txReady = new CountDownLatch(1);
318             Thread txThread = new Thread() {
319                 @Override
320                 public void run() {
321                     try {
322                         writeTx.write(TestModel.TEST_PATH,
323                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
324
325                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
326                                 TestModel.OUTER_LIST_QNAME).build());
327
328                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
329                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
330
331                         writeTx.delete(listEntryPath);
332
333                         txCohort.set(writeTx.ready());
334                     } catch(Exception e) {
335                         caughtEx.set(e);
336                         return;
337                     } finally {
338                         txReady.countDown();
339                     }
340                 }
341             };
342
343             txThread.start();
344
345             // Wait for the Tx operations to complete.
346
347             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
348             if(caughtEx.get() != null) {
349                 throw caughtEx.get();
350             }
351
352             assertEquals("Tx ready", true, done);
353
354             // At this point the Tx operations should be waiting for the shard to initialize so
355             // trigger the latch to let the shard recovery to continue.
356
357             blockRecoveryLatch.countDown();
358
359             // Wait for the Tx commit to complete.
360
361             doCommit(txCohort.get());
362
363             // Verify the data in the store
364
365             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
366
367             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
368                     get(5, TimeUnit.SECONDS);
369             assertEquals("isPresent", true, optional.isPresent());
370
371             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
372             assertEquals("isPresent", true, optional.isPresent());
373
374             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
375             assertEquals("isPresent", false, optional.isPresent());
376
377             cleanup(dataStore);
378         }};
379     }
380
381     @Test
382     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
383         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
384         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
385     }
386
387     @Test
388     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
389         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
390     }
391
392     @Test
393     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
394         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
395             String testName = "testTransactionReadsWithShardNotInitiallyReady";
396             String shardName = "test-1";
397
398             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
399             // initialized until we create the Tx.
400             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
401             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
402             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
403
404             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
405
406             // Create the read-write Tx
407
408             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
409             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
410
411             // Do some reads on the Tx on a separate thread.
412
413             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
414                     new AtomicReference<>();
415             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
416                     txReadFuture = new AtomicReference<>();
417             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
418             final CountDownLatch txReadsDone = new CountDownLatch(1);
419             Thread txThread = new Thread() {
420                 @Override
421                 public void run() {
422                     try {
423                         readWriteTx.write(TestModel.TEST_PATH,
424                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
425
426                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
427
428                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
429                     } catch(Exception e) {
430                         caughtEx.set(e);
431                         return;
432                     } finally {
433                         txReadsDone.countDown();
434                     }
435                 }
436             };
437
438             txThread.start();
439
440             // Wait for the Tx operations to complete.
441
442             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
443             if(caughtEx.get() != null) {
444                 throw caughtEx.get();
445             }
446
447             assertEquals("Tx reads done", true, done);
448
449             // At this point the Tx operations should be waiting for the shard to initialize so
450             // trigger the latch to let the shard recovery to continue.
451
452             blockRecoveryLatch.countDown();
453
454             // Wait for the reads to complete and verify.
455
456             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
457             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
458
459             readWriteTx.close();
460
461             cleanup(dataStore);
462         }};
463     }
464
465     @Test(expected=NotInitializedException.class)
466     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
467         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
468             String testName = "testTransactionCommitFailureWithShardNotInitialized";
469             String shardName = "test-1";
470
471             // Set the shard initialization timeout low for the test.
472
473             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
474
475             // Setup the InMemoryJournal to block shard recovery indefinitely.
476
477             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
478             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
479             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
480
481             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
482
483             // Create the write Tx
484
485             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
486             assertNotNull("newReadWriteTransaction returned null", writeTx);
487
488             // Do some modifications and ready the Tx on a separate thread.
489
490             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
491             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
492             final CountDownLatch txReady = new CountDownLatch(1);
493             Thread txThread = new Thread() {
494                 @Override
495                 public void run() {
496                     try {
497                         writeTx.write(TestModel.TEST_PATH,
498                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
499
500                         txCohort.set(writeTx.ready());
501                     } catch(Exception e) {
502                         caughtEx.set(e);
503                         return;
504                     } finally {
505                         txReady.countDown();
506                     }
507                 }
508             };
509
510             txThread.start();
511
512             // Wait for the Tx operations to complete.
513
514             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
515             if(caughtEx.get() != null) {
516                 throw caughtEx.get();
517             }
518
519             assertEquals("Tx ready", true, done);
520
521             // Wait for the commit to complete. Since the shard never initialized, the Tx should
522             // have timed out and throw an appropriate exception cause.
523
524             try {
525                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
526             } catch(ExecutionException e) {
527                 throw e.getCause();
528             } finally {
529                 blockRecoveryLatch.countDown();
530                 cleanup(dataStore);
531             }
532         }};
533     }
534
535     @Test(expected=NotInitializedException.class)
536     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
537         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
538             String testName = "testTransactionReadFailureWithShardNotInitialized";
539             String shardName = "test-1";
540
541             // Set the shard initialization timeout low for the test.
542
543             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
544
545             // Setup the InMemoryJournal to block shard recovery indefinitely.
546
547             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
548             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
549             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
550
551             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
552
553             // Create the read-write Tx
554
555             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
556             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
557
558             // Do a read on the Tx on a separate thread.
559
560             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
561                     txReadFuture = new AtomicReference<>();
562             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
563             final CountDownLatch txReadDone = new CountDownLatch(1);
564             Thread txThread = new Thread() {
565                 @Override
566                 public void run() {
567                     try {
568                         readWriteTx.write(TestModel.TEST_PATH,
569                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
570
571                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
572
573                         readWriteTx.close();
574                     } catch(Exception e) {
575                         caughtEx.set(e);
576                         return;
577                     } finally {
578                         txReadDone.countDown();
579                     }
580                 }
581             };
582
583             txThread.start();
584
585             // Wait for the Tx operations to complete.
586
587             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
588             if(caughtEx.get() != null) {
589                 throw caughtEx.get();
590             }
591
592             assertEquals("Tx read done", true, done);
593
594             // Wait for the read to complete. Since the shard never initialized, the Tx should
595             // have timed out and throw an appropriate exception cause.
596
597             try {
598                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
599             } catch(ReadFailedException e) {
600                 throw e.getCause();
601             } finally {
602                 blockRecoveryLatch.countDown();
603                 cleanup(dataStore);
604             }
605         }};
606     }
607
608     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
609         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
610             String testName = "testTransactionCommitFailureWithNoShardLeader";
611             String shardName = "default";
612
613             // We don't want the shard to become the leader so prevent shard election from completing
614             // by setting the election timeout, which is based on the heartbeat interval, really high.
615
616             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
617             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
618
619             // Set the leader election timeout low for the test.
620
621             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
622
623             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
624
625             // Create the write Tx.
626
627             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
628                 dataStore.newReadWriteTransaction();
629             assertNotNull("newReadWriteTransaction returned null", writeTx);
630
631             // Do some modifications and ready the Tx on a separate thread.
632
633             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
634             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
635             final CountDownLatch txReady = new CountDownLatch(1);
636             Thread txThread = new Thread() {
637                 @Override
638                 public void run() {
639                     try {
640                         writeTx.write(TestModel.JUNK_PATH,
641                                 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
642
643                         txCohort.set(writeTx.ready());
644                     } catch(Exception e) {
645                         caughtEx.set(e);
646                         return;
647                     } finally {
648                         txReady.countDown();
649                     }
650                 }
651             };
652
653             txThread.start();
654
655             // Wait for the Tx operations to complete.
656
657             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
658             if(caughtEx.get() != null) {
659                 throw caughtEx.get();
660             }
661
662             assertEquals("Tx ready", true, done);
663
664             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
665             // should have timed out and throw an appropriate exception cause.
666
667             try {
668                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
669             } catch(ExecutionException e) {
670                 throw e.getCause();
671             } finally {
672                 cleanup(dataStore);
673             }
674         }};
675     }
676
677     @Test(expected=NoShardLeaderException.class)
678     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
679         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
680         testTransactionCommitFailureWithNoShardLeader(true);
681     }
682
683     @Test(expected=NoShardLeaderException.class)
684     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
685         testTransactionCommitFailureWithNoShardLeader(false);
686     }
687
688     @Test
689     public void testTransactionAbort() throws Exception{
690         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
691             DistributedDataStore dataStore =
692                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
693
694             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
695             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
696
697             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
698
699             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
700
701             cohort.canCommit().get(5, TimeUnit.SECONDS);
702
703             cohort.abort().get(5, TimeUnit.SECONDS);
704
705             testWriteTransaction(dataStore, TestModel.TEST_PATH,
706                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
707
708             cleanup(dataStore);
709         }};
710     }
711
712     @Test
713     public void testTransactionChainWithSingleShard() throws Exception{
714         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
715             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
716
717             // 1. Create a Tx chain and write-only Tx
718
719             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
720
721             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
722             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
723
724             // 2. Write some data
725
726             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
727             writeTx.write(TestModel.TEST_PATH, testNode);
728
729             // 3. Ready the Tx for commit
730
731             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
732
733             // 4. Commit the Tx on another thread that first waits for the second read Tx.
734
735             final CountDownLatch continueCommit1 = new CountDownLatch(1);
736             final CountDownLatch commit1Done = new CountDownLatch(1);
737             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
738             new Thread() {
739                 @Override
740                 public void run() {
741                     try {
742                         continueCommit1.await();
743                         doCommit(cohort1);
744                     } catch (Exception e) {
745                         commit1Error.set(e);
746                     } finally {
747                         commit1Done.countDown();
748                     }
749                 }
750             }.start();
751
752             // 5. Create a new read Tx from the chain to read and verify the data from the first
753             // Tx is visible after being readied.
754
755             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
756             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
757             assertEquals("isPresent", true, optional.isPresent());
758             assertEquals("Data node", testNode, optional.get());
759
760             // 6. Create a new RW Tx from the chain, write more data, and ready it
761
762             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
763             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
764             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
765
766             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
767
768             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
769             // verify it is visible.
770
771             readTx = txChain.newReadWriteTransaction();
772             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
773             assertEquals("isPresent", true, optional.isPresent());
774             assertEquals("Data node", outerNode, optional.get());
775
776             // 8. Wait for the 2 commits to complete and close the chain.
777
778             continueCommit1.countDown();
779             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
780
781             if(commit1Error.get() != null) {
782                 throw commit1Error.get();
783             }
784
785             doCommit(cohort2);
786
787             txChain.close();
788
789             // 9. Create a new read Tx from the data store and verify committed data.
790
791             readTx = dataStore.newReadOnlyTransaction();
792             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
793             assertEquals("isPresent", true, optional.isPresent());
794             assertEquals("Data node", outerNode, optional.get());
795
796             cleanup(dataStore);
797         }};
798     }
799
800     @Test
801     public void testTransactionChainWithMultipleShards() throws Exception{
802         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
803             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
804                     "cars-1", "people-1");
805
806             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
807
808             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
809             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
810
811             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
812             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
813
814             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
815             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
816
817             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
818
819             DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
820
821             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
822             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
823             readWriteTx.write(carPath, car);
824
825             MapEntryNode person = PeopleModel.newPersonEntry("jack");
826             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
827             readWriteTx.merge(personPath, person);
828
829             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
830             assertEquals("isPresent", true, optional.isPresent());
831             assertEquals("Data node", car, optional.get());
832
833             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
834             assertEquals("isPresent", true, optional.isPresent());
835             assertEquals("Data node", person, optional.get());
836
837             DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
838
839             writeTx = txChain.newWriteOnlyTransaction();
840
841             writeTx.delete(carPath);
842
843             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
844
845             ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
846             ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
847
848             doCommit(canCommit1, cohort1);
849             doCommit(canCommit2, cohort2);
850             doCommit(cohort3);
851
852             txChain.close();
853
854             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
855
856             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
857             assertEquals("isPresent", false, optional.isPresent());
858
859             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
860             assertEquals("isPresent", true, optional.isPresent());
861             assertEquals("Data node", person, optional.get());
862
863             cleanup(dataStore);
864         }};
865     }
866
867     @Test
868     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
869         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
870             DistributedDataStore dataStore = setupDistributedDataStore(
871                     "testCreateChainedTransactionsInQuickSuccession", "cars-1");
872
873             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
874                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
875                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
876
877             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
878             DOMTransactionChain txChain = broker.createTransactionChain(listener);
879
880             List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
881
882             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
883             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
884             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
885             futures.add(writeTx.submit());
886
887             int nCars = 100;
888             for(int i = 0; i < nCars; i++) {
889                 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
890
891                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
892                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
893
894                 futures.add(rwTx.submit());
895             }
896
897             for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
898                 f.checkedGet();
899             }
900
901             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
902                     LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
903             assertEquals("isPresent", true, optional.isPresent());
904             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
905
906             txChain.close();
907
908             broker.close();
909
910             cleanup(dataStore);
911         }};
912     }
913
914     @Test
915     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
916         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
917             DistributedDataStore dataStore = setupDistributedDataStore(
918                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
919
920             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
921
922             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
923
924             rwTx1.ready();
925
926             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
927
928             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
929             assertEquals("isPresent", false, optional.isPresent());
930
931             txChain.close();
932
933             cleanup(dataStore);
934         }};
935     }
936
937     @Test
938     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
939         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
940             DistributedDataStore dataStore = setupDistributedDataStore(
941                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
942
943             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
944
945             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
946             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
947
948             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
949
950             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
951             // readied.
952
953             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
954         }};
955     }
956
957     @Test
958     public void testCreateChainedTransactionAfterClose() throws Throwable {
959         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
960             DistributedDataStore dataStore = setupDistributedDataStore(
961                     "testCreateChainedTransactionAfterClose", "test-1");
962
963             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
964
965             txChain.close();
966
967             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
968
969             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
970         }};
971     }
972
973     @Test
974     public void testChainedTransactionFailureWithSingleShard() throws Exception{
975         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
976             DistributedDataStore dataStore = setupDistributedDataStore(
977                     "testChainedTransactionFailureWithSingleShard", "cars-1");
978
979             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
980                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
981                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
982
983             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
984             DOMTransactionChain txChain = broker.createTransactionChain(listener);
985
986             DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
987
988             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
989                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
990                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
991
992             rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
993
994             try {
995                 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
996                 fail("Expected TransactionCommitFailedException");
997             } catch (TransactionCommitFailedException e) {
998                 // Expected
999             }
1000
1001             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1002
1003             txChain.close();
1004             broker.close();
1005             cleanup(dataStore);
1006         }};
1007     }
1008
1009     @Test
1010     public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1011         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1012             DistributedDataStore dataStore = setupDistributedDataStore(
1013                     "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1014
1015             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1016                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1017                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1018
1019             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1020             DOMTransactionChain txChain = broker.createTransactionChain(listener);
1021
1022             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1023
1024             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1025
1026             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1027                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1028                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1029
1030             // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1031             // done for put for performance reasons.
1032             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1033
1034             try {
1035                 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1036                 fail("Expected TransactionCommitFailedException");
1037             } catch (TransactionCommitFailedException e) {
1038                 // Expected
1039             }
1040
1041             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1042
1043             txChain.close();
1044             broker.close();
1045             cleanup(dataStore);
1046         }};
1047     }
1048
1049     @Test
1050     public void testChangeListenerRegistration() throws Exception{
1051         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1052             DistributedDataStore dataStore =
1053                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1054
1055             testWriteTransaction(dataStore, TestModel.TEST_PATH,
1056                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1057
1058             MockDataChangeListener listener = new MockDataChangeListener(1);
1059
1060             ListenerRegistration<MockDataChangeListener>
1061                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1062                             DataChangeScope.SUBTREE);
1063
1064             assertNotNull("registerChangeListener returned null", listenerReg);
1065
1066             // Wait for the initial notification
1067
1068             listener.waitForChangeEvents(TestModel.TEST_PATH);
1069
1070             listener.reset(2);
1071
1072             // Write 2 updates.
1073
1074             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1075                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1076
1077             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1078                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1079             testWriteTransaction(dataStore, listPath,
1080                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1081
1082             // Wait for the 2 updates.
1083
1084             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1085
1086             listenerReg.close();
1087
1088             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1089                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1090                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1091
1092             listener.expectNoMoreChanges("Received unexpected change after close");
1093
1094             cleanup(dataStore);
1095         }};
1096     }
1097 }