Bug 3194: Dynamically update PrimaryShardInfo cache when leader changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import akka.actor.ActorSystem;
6 import akka.actor.Address;
7 import akka.actor.AddressFromURIString;
8 import akka.cluster.Cluster;
9 import akka.testkit.JavaTestKit;
10 import com.google.common.base.Optional;
11 import com.google.common.collect.ImmutableMap;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.Uninterruptibles;
16 import com.typesafe.config.ConfigFactory;
17 import java.io.IOException;
18 import java.math.BigInteger;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicReference;
26 import org.junit.AfterClass;
27 import org.junit.BeforeClass;
28 import org.junit.Test;
29 import org.mockito.Mockito;
30 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
31 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
32 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
33 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
34 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
35 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
36 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
37 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
38 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
39 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
40 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
41 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
42 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
45 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
46 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
47 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
48 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
49 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
57 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
58
59 public class DistributedDataStoreIntegrationTest {
60
61     private static ActorSystem system;
62
63     private final DatastoreContext.Builder datastoreContextBuilder =
64             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
65
66     @BeforeClass
67     public static void setUpClass() throws IOException {
68         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
69         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
70         Cluster.get(system).join(member1Address);
71     }
72
73     @AfterClass
74     public static void tearDownClass() throws IOException {
75         JavaTestKit.shutdownActorSystem(system);
76         system = null;
77     }
78
79     protected ActorSystem getSystem() {
80         return system;
81     }
82
83     @Test
84     public void testWriteTransactionWithSingleShard() throws Exception{
85         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
86             DistributedDataStore dataStore =
87                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
88
89             testWriteTransaction(dataStore, TestModel.TEST_PATH,
90                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
91
92             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
93                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
94
95             cleanup(dataStore);
96         }};
97     }
98
99     @Test
100     public void testWriteTransactionWithMultipleShards() throws Exception{
101         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
102             DistributedDataStore dataStore =
103                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
104
105             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
106             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
107
108             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
109             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
110
111             doCommit(writeTx.ready());
112
113             writeTx = dataStore.newWriteOnlyTransaction();
114
115             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
116             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
117
118             doCommit(writeTx.ready());
119
120             writeTx = dataStore.newWriteOnlyTransaction();
121
122             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
123             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
124             writeTx.write(carPath, car);
125
126             MapEntryNode person = PeopleModel.newPersonEntry("jack");
127             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
128             writeTx.write(personPath, person);
129
130             doCommit(writeTx.ready());
131
132             // Verify the data in the store
133
134             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
135
136             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
137             assertEquals("isPresent", true, optional.isPresent());
138             assertEquals("Data node", car, optional.get());
139
140             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
141             assertEquals("isPresent", true, optional.isPresent());
142             assertEquals("Data node", person, optional.get());
143
144             cleanup(dataStore);
145         }};
146     }
147
148     @Test
149     public void testReadWriteTransactionWithSingleShard() throws Exception{
150         System.setProperty("shard.persistent", "true");
151         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
152             DistributedDataStore dataStore =
153                     setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
154
155             // 1. Create a read-write Tx
156
157             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
158             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
159
160             // 2. Write some data
161
162             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
163             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
164             readWriteTx.write(nodePath, nodeToWrite );
165
166             // 3. Read the data from Tx
167
168             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
169             assertEquals("exists", true, exists);
170
171             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
172             assertEquals("isPresent", true, optional.isPresent());
173             assertEquals("Data node", nodeToWrite, optional.get());
174
175             // 4. Ready the Tx for commit
176
177             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
178
179             // 5. Commit the Tx
180
181             doCommit(cohort);
182
183             // 6. Verify the data in the store
184
185             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
186
187             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
188             assertEquals("isPresent", true, optional.isPresent());
189             assertEquals("Data node", nodeToWrite, optional.get());
190
191             cleanup(dataStore);
192         }};
193     }
194
195     @Test
196     public void testReadWriteTransactionWithMultipleShards() throws Exception{
197         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
198             DistributedDataStore dataStore =
199                     setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
200
201             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
202             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
203
204             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
205             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
206
207             doCommit(readWriteTx.ready());
208
209             readWriteTx = dataStore.newReadWriteTransaction();
210
211             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
212             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
213
214             doCommit(readWriteTx.ready());
215
216             readWriteTx = dataStore.newReadWriteTransaction();
217
218             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
219             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
220             readWriteTx.write(carPath, car);
221
222             MapEntryNode person = PeopleModel.newPersonEntry("jack");
223             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
224             readWriteTx.write(personPath, person);
225
226             Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
227             assertEquals("exists", true, exists);
228
229             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
230             assertEquals("isPresent", true, optional.isPresent());
231             assertEquals("Data node", car, optional.get());
232
233             doCommit(readWriteTx.ready());
234
235             // Verify the data in the store
236
237             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
238
239             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
240             assertEquals("isPresent", true, optional.isPresent());
241             assertEquals("Data node", car, optional.get());
242
243             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
244             assertEquals("isPresent", true, optional.isPresent());
245             assertEquals("Data node", person, optional.get());
246
247             cleanup(dataStore);
248         }};
249     }
250
251     @Test
252     public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
253         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
254             DistributedDataStore dataStore = setupDistributedDataStore(
255                     "testSingleTransactionsWritesInQuickSuccession", "cars-1");
256
257             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
258
259             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
260             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
261             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
262             doCommit(writeTx.ready());
263
264             writeTx = txChain.newWriteOnlyTransaction();
265
266             int nCars = 5;
267             for(int i = 0; i < nCars; i++) {
268                 writeTx.write(CarsModel.newCarPath("car" + i),
269                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
270             }
271
272             doCommit(writeTx.ready());
273
274             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
275                     CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
276             assertEquals("isPresent", true, optional.isPresent());
277             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
278
279             cleanup(dataStore);
280         }};
281     }
282
283     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
284             final boolean writeOnly) throws Exception {
285         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
286             String shardName = "test-1";
287
288             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
289             // initialized until we create and submit the write the Tx.
290             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
291             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
292             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
293
294             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
295
296             // Create the write Tx
297
298             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
299                     dataStore.newReadWriteTransaction();
300             assertNotNull("newReadWriteTransaction returned null", writeTx);
301
302             // Do some modification operations and ready the Tx on a separate thread.
303
304             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
305                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
306                             TestModel.ID_QNAME, 1).build();
307
308             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
309             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
310             final CountDownLatch txReady = new CountDownLatch(1);
311             Thread txThread = new Thread() {
312                 @Override
313                 public void run() {
314                     try {
315                         writeTx.write(TestModel.TEST_PATH,
316                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
317
318                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
319                                 TestModel.OUTER_LIST_QNAME).build());
320
321                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
322                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
323
324                         writeTx.delete(listEntryPath);
325
326                         txCohort.set(writeTx.ready());
327                     } catch(Exception e) {
328                         caughtEx.set(e);
329                         return;
330                     } finally {
331                         txReady.countDown();
332                     }
333                 }
334             };
335
336             txThread.start();
337
338             // Wait for the Tx operations to complete.
339
340             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
341             if(caughtEx.get() != null) {
342                 throw caughtEx.get();
343             }
344
345             assertEquals("Tx ready", true, done);
346
347             // At this point the Tx operations should be waiting for the shard to initialize so
348             // trigger the latch to let the shard recovery to continue.
349
350             blockRecoveryLatch.countDown();
351
352             // Wait for the Tx commit to complete.
353
354             doCommit(txCohort.get());
355
356             // Verify the data in the store
357
358             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
359
360             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
361                     get(5, TimeUnit.SECONDS);
362             assertEquals("isPresent", true, optional.isPresent());
363
364             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
365             assertEquals("isPresent", true, optional.isPresent());
366
367             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
368             assertEquals("isPresent", false, optional.isPresent());
369
370             cleanup(dataStore);
371         }};
372     }
373
374     @Test
375     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
376         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
377         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
378     }
379
380     @Test
381     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
382         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
383     }
384
385     @Test
386     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
387         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
388             String testName = "testTransactionReadsWithShardNotInitiallyReady";
389             String shardName = "test-1";
390
391             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
392             // initialized until we create the Tx.
393             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
394             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
395             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
396
397             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
398
399             // Create the read-write Tx
400
401             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
402             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
403
404             // Do some reads on the Tx on a separate thread.
405
406             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
407                     new AtomicReference<>();
408             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
409                     txReadFuture = new AtomicReference<>();
410             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
411             final CountDownLatch txReadsDone = new CountDownLatch(1);
412             Thread txThread = new Thread() {
413                 @Override
414                 public void run() {
415                     try {
416                         readWriteTx.write(TestModel.TEST_PATH,
417                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
418
419                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
420
421                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
422                     } catch(Exception e) {
423                         caughtEx.set(e);
424                         return;
425                     } finally {
426                         txReadsDone.countDown();
427                     }
428                 }
429             };
430
431             txThread.start();
432
433             // Wait for the Tx operations to complete.
434
435             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
436             if(caughtEx.get() != null) {
437                 throw caughtEx.get();
438             }
439
440             assertEquals("Tx reads done", true, done);
441
442             // At this point the Tx operations should be waiting for the shard to initialize so
443             // trigger the latch to let the shard recovery to continue.
444
445             blockRecoveryLatch.countDown();
446
447             // Wait for the reads to complete and verify.
448
449             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
450             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
451
452             readWriteTx.close();
453
454             cleanup(dataStore);
455         }};
456     }
457
458     @Test(expected=NotInitializedException.class)
459     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
460         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
461             String testName = "testTransactionCommitFailureWithShardNotInitialized";
462             String shardName = "test-1";
463
464             // Set the shard initialization timeout low for the test.
465
466             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
467
468             // Setup the InMemoryJournal to block shard recovery indefinitely.
469
470             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
471             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
472             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
473
474             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
475
476             // Create the write Tx
477
478             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
479             assertNotNull("newReadWriteTransaction returned null", writeTx);
480
481             // Do some modifications and ready the Tx on a separate thread.
482
483             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
484             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
485             final CountDownLatch txReady = new CountDownLatch(1);
486             Thread txThread = new Thread() {
487                 @Override
488                 public void run() {
489                     try {
490                         writeTx.write(TestModel.TEST_PATH,
491                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
492
493                         txCohort.set(writeTx.ready());
494                     } catch(Exception e) {
495                         caughtEx.set(e);
496                         return;
497                     } finally {
498                         txReady.countDown();
499                     }
500                 }
501             };
502
503             txThread.start();
504
505             // Wait for the Tx operations to complete.
506
507             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
508             if(caughtEx.get() != null) {
509                 throw caughtEx.get();
510             }
511
512             assertEquals("Tx ready", true, done);
513
514             // Wait for the commit to complete. Since the shard never initialized, the Tx should
515             // have timed out and throw an appropriate exception cause.
516
517             try {
518                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
519             } catch(ExecutionException e) {
520                 throw e.getCause();
521             } finally {
522                 blockRecoveryLatch.countDown();
523                 cleanup(dataStore);
524             }
525         }};
526     }
527
528     @Test(expected=NotInitializedException.class)
529     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
530         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
531             String testName = "testTransactionReadFailureWithShardNotInitialized";
532             String shardName = "test-1";
533
534             // Set the shard initialization timeout low for the test.
535
536             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
537
538             // Setup the InMemoryJournal to block shard recovery indefinitely.
539
540             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
541             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
542             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
543
544             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
545
546             // Create the read-write Tx
547
548             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
549             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
550
551             // Do a read on the Tx on a separate thread.
552
553             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
554                     txReadFuture = new AtomicReference<>();
555             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
556             final CountDownLatch txReadDone = new CountDownLatch(1);
557             Thread txThread = new Thread() {
558                 @Override
559                 public void run() {
560                     try {
561                         readWriteTx.write(TestModel.TEST_PATH,
562                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
563
564                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
565
566                         readWriteTx.close();
567                     } catch(Exception e) {
568                         caughtEx.set(e);
569                         return;
570                     } finally {
571                         txReadDone.countDown();
572                     }
573                 }
574             };
575
576             txThread.start();
577
578             // Wait for the Tx operations to complete.
579
580             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
581             if(caughtEx.get() != null) {
582                 throw caughtEx.get();
583             }
584
585             assertEquals("Tx read done", true, done);
586
587             // Wait for the read to complete. Since the shard never initialized, the Tx should
588             // have timed out and throw an appropriate exception cause.
589
590             try {
591                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
592             } catch(ReadFailedException e) {
593                 throw e.getCause();
594             } finally {
595                 blockRecoveryLatch.countDown();
596                 cleanup(dataStore);
597             }
598         }};
599     }
600
601     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
602         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
603             String testName = "testTransactionCommitFailureWithNoShardLeader";
604             String shardName = "default";
605
606             // We don't want the shard to become the leader so prevent shard election from completing
607             // by setting the election timeout, which is based on the heartbeat interval, really high.
608
609             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
610             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
611
612             // Set the leader election timeout low for the test.
613
614             datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS);
615
616             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
617
618             // Create the write Tx.
619
620             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
621                 dataStore.newReadWriteTransaction();
622             assertNotNull("newReadWriteTransaction returned null", writeTx);
623
624             // Do some modifications and ready the Tx on a separate thread.
625
626             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
627             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
628             final CountDownLatch txReady = new CountDownLatch(1);
629             Thread txThread = new Thread() {
630                 @Override
631                 public void run() {
632                     try {
633                         writeTx.write(TestModel.JUNK_PATH,
634                                 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
635
636                         txCohort.set(writeTx.ready());
637                     } catch(Exception e) {
638                         caughtEx.set(e);
639                         return;
640                     } finally {
641                         txReady.countDown();
642                     }
643                 }
644             };
645
646             txThread.start();
647
648             // Wait for the Tx operations to complete.
649
650             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
651             if(caughtEx.get() != null) {
652                 throw caughtEx.get();
653             }
654
655             assertEquals("Tx ready", true, done);
656
657             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
658             // should have timed out and throw an appropriate exception cause.
659
660             try {
661                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
662             } catch(ExecutionException e) {
663                 throw e.getCause();
664             } finally {
665                 cleanup(dataStore);
666             }
667         }};
668     }
669
670     @Test(expected=NoShardLeaderException.class)
671     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
672         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
673         testTransactionCommitFailureWithNoShardLeader(true);
674     }
675
676     @Test(expected=NoShardLeaderException.class)
677     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
678         testTransactionCommitFailureWithNoShardLeader(false);
679     }
680
681     @Test
682     public void testTransactionAbort() throws Exception{
683         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
684             DistributedDataStore dataStore =
685                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
686
687             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
688             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
689
690             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
691
692             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
693
694             cohort.canCommit().get(5, TimeUnit.SECONDS);
695
696             cohort.abort().get(5, TimeUnit.SECONDS);
697
698             testWriteTransaction(dataStore, TestModel.TEST_PATH,
699                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
700
701             cleanup(dataStore);
702         }};
703     }
704
705     @Test
706     public void testTransactionChainWithSingleShard() throws Exception{
707         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
708             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
709
710             // 1. Create a Tx chain and write-only Tx
711
712             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
713
714             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
715             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
716
717             // 2. Write some data
718
719             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
720             writeTx.write(TestModel.TEST_PATH, testNode);
721
722             // 3. Ready the Tx for commit
723
724             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
725
726             // 4. Commit the Tx on another thread that first waits for the second read Tx.
727
728             final CountDownLatch continueCommit1 = new CountDownLatch(1);
729             final CountDownLatch commit1Done = new CountDownLatch(1);
730             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
731             new Thread() {
732                 @Override
733                 public void run() {
734                     try {
735                         continueCommit1.await();
736                         doCommit(cohort1);
737                     } catch (Exception e) {
738                         commit1Error.set(e);
739                     } finally {
740                         commit1Done.countDown();
741                     }
742                 }
743             }.start();
744
745             // 5. Create a new read Tx from the chain to read and verify the data from the first
746             // Tx is visible after being readied.
747
748             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
749             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
750             assertEquals("isPresent", true, optional.isPresent());
751             assertEquals("Data node", testNode, optional.get());
752
753             // 6. Create a new RW Tx from the chain, write more data, and ready it
754
755             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
756             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
757             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
758
759             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
760
761             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
762             // verify it is visible.
763
764             readTx = txChain.newReadWriteTransaction();
765             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
766             assertEquals("isPresent", true, optional.isPresent());
767             assertEquals("Data node", outerNode, optional.get());
768
769             // 8. Wait for the 2 commits to complete and close the chain.
770
771             continueCommit1.countDown();
772             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
773
774             if(commit1Error.get() != null) {
775                 throw commit1Error.get();
776             }
777
778             doCommit(cohort2);
779
780             txChain.close();
781
782             // 9. Create a new read Tx from the data store and verify committed data.
783
784             readTx = dataStore.newReadOnlyTransaction();
785             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
786             assertEquals("isPresent", true, optional.isPresent());
787             assertEquals("Data node", outerNode, optional.get());
788
789             cleanup(dataStore);
790         }};
791     }
792
793     @Test
794     public void testTransactionChainWithMultipleShards() throws Exception{
795         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
796             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
797                     "cars-1", "people-1");
798
799             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
800
801             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
802             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
803
804             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
805             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
806
807             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
808             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
809
810             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
811
812             DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
813
814             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
815             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
816             readWriteTx.write(carPath, car);
817
818             MapEntryNode person = PeopleModel.newPersonEntry("jack");
819             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
820             readWriteTx.merge(personPath, person);
821
822             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
823             assertEquals("isPresent", true, optional.isPresent());
824             assertEquals("Data node", car, optional.get());
825
826             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
827             assertEquals("isPresent", true, optional.isPresent());
828             assertEquals("Data node", person, optional.get());
829
830             DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
831
832             writeTx = txChain.newWriteOnlyTransaction();
833
834             writeTx.delete(carPath);
835
836             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
837
838             ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
839             ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
840
841             doCommit(canCommit1, cohort1);
842             doCommit(canCommit2, cohort2);
843             doCommit(cohort3);
844
845             txChain.close();
846
847             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
848
849             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
850             assertEquals("isPresent", false, optional.isPresent());
851
852             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
853             assertEquals("isPresent", true, optional.isPresent());
854             assertEquals("Data node", person, optional.get());
855
856             cleanup(dataStore);
857         }};
858     }
859
860     @Test
861     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
862         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
863             DistributedDataStore dataStore = setupDistributedDataStore(
864                     "testCreateChainedTransactionsInQuickSuccession", "cars-1");
865
866             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
867                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
868                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
869
870             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
871             DOMTransactionChain txChain = broker.createTransactionChain(listener);
872
873             List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
874
875             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
876             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
877             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
878             futures.add(writeTx.submit());
879
880             int nCars = 100;
881             for(int i = 0; i < nCars; i++) {
882                 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
883
884                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
885                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
886
887                 futures.add(rwTx.submit());
888             }
889
890             for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
891                 f.checkedGet();
892             }
893
894             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
895                     LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
896             assertEquals("isPresent", true, optional.isPresent());
897             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
898
899             txChain.close();
900
901             broker.close();
902
903             cleanup(dataStore);
904         }};
905     }
906
907     @Test
908     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
909         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
910             DistributedDataStore dataStore = setupDistributedDataStore(
911                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
912
913             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
914
915             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
916
917             rwTx1.ready();
918
919             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
920
921             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
922             assertEquals("isPresent", false, optional.isPresent());
923
924             txChain.close();
925
926             cleanup(dataStore);
927         }};
928     }
929
930     @Test
931     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
932         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
933             DistributedDataStore dataStore = setupDistributedDataStore(
934                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
935
936             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
937
938             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
939             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
940
941             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
942
943             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
944             // readied.
945
946             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
947         }};
948     }
949
950     @Test
951     public void testCreateChainedTransactionAfterClose() throws Throwable {
952         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
953             DistributedDataStore dataStore = setupDistributedDataStore(
954                     "testCreateChainedTransactionAfterClose", "test-1");
955
956             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
957
958             txChain.close();
959
960             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
961
962             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
963         }};
964     }
965
966     @Test
967     public void testChangeListenerRegistration() throws Exception{
968         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
969             DistributedDataStore dataStore =
970                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
971
972             testWriteTransaction(dataStore, TestModel.TEST_PATH,
973                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
974
975             MockDataChangeListener listener = new MockDataChangeListener(1);
976
977             ListenerRegistration<MockDataChangeListener>
978                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
979                             DataChangeScope.SUBTREE);
980
981             assertNotNull("registerChangeListener returned null", listenerReg);
982
983             // Wait for the initial notification
984
985             listener.waitForChangeEvents(TestModel.TEST_PATH);
986
987             listener.reset(2);
988
989             // Write 2 updates.
990
991             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
992                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
993
994             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
995                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
996             testWriteTransaction(dataStore, listPath,
997                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
998
999             // Wait for the 2 updates.
1000
1001             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1002
1003             listenerReg.close();
1004
1005             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1006                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1007                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1008
1009             listener.expectNoMoreChanges("Received unexpected change after close");
1010
1011             cleanup(dataStore);
1012         }};
1013     }
1014 }