CDS: Retry remote front-end transactions on AskTimeoutException
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.fail;
7 import static org.mockito.Matchers.any;
8 import static org.mockito.Matchers.eq;
9 import static org.mockito.Mockito.timeout;
10 import static org.mockito.Mockito.verify;
11 import akka.actor.ActorSystem;
12 import akka.actor.Address;
13 import akka.actor.AddressFromURIString;
14 import akka.cluster.Cluster;
15 import akka.testkit.JavaTestKit;
16 import com.google.common.base.Optional;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import com.typesafe.config.ConfigFactory;
23 import java.io.IOException;
24 import java.math.BigInteger;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.AfterClass;
33 import org.junit.BeforeClass;
34 import org.junit.Test;
35 import org.mockito.Mockito;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
38 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
39 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
40 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
41 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
42 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
43 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
44 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
45 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
46 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
47 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
48 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
49 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
50 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
51 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
52 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
53 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
54 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
57 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
58 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
59 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
60 import org.opendaylight.yangtools.concepts.ListenerRegistration;
61 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
62 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
63 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
64 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
65 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
66 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
67 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
68
69 public class DistributedDataStoreIntegrationTest {
70
71     private static ActorSystem system;
72
73     private final DatastoreContext.Builder datastoreContextBuilder =
74             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
75
76     @BeforeClass
77     public static void setUpClass() throws IOException {
78         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
79         Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
80         Cluster.get(system).join(member1Address);
81     }
82
83     @AfterClass
84     public static void tearDownClass() throws IOException {
85         JavaTestKit.shutdownActorSystem(system);
86         system = null;
87     }
88
89     protected ActorSystem getSystem() {
90         return system;
91     }
92
93     @Test
94     public void testWriteTransactionWithSingleShard() throws Exception{
95         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
96             DistributedDataStore dataStore =
97                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
98
99             testWriteTransaction(dataStore, TestModel.TEST_PATH,
100                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
101
102             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
103                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
104
105             cleanup(dataStore);
106         }};
107     }
108
109     @Test
110     public void testWriteTransactionWithMultipleShards() throws Exception{
111         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
112             DistributedDataStore dataStore =
113                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
114
115             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
116             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
117
118             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
119             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
120
121             doCommit(writeTx.ready());
122
123             writeTx = dataStore.newWriteOnlyTransaction();
124
125             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
126             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
127
128             doCommit(writeTx.ready());
129
130             writeTx = dataStore.newWriteOnlyTransaction();
131
132             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
133             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
134             writeTx.write(carPath, car);
135
136             MapEntryNode person = PeopleModel.newPersonEntry("jack");
137             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
138             writeTx.write(personPath, person);
139
140             doCommit(writeTx.ready());
141
142             // Verify the data in the store
143
144             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
145
146             Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
147             assertEquals("isPresent", true, optional.isPresent());
148             assertEquals("Data node", car, optional.get());
149
150             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
151             assertEquals("isPresent", true, optional.isPresent());
152             assertEquals("Data node", person, optional.get());
153
154             cleanup(dataStore);
155         }};
156     }
157
158     @Test
159     public void testReadWriteTransactionWithSingleShard() throws Exception{
160         System.setProperty("shard.persistent", "true");
161         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
162             DistributedDataStore dataStore =
163                     setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
164
165             // 1. Create a read-write Tx
166
167             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
168             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
169
170             // 2. Write some data
171
172             YangInstanceIdentifier nodePath = TestModel.TEST_PATH;
173             NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
174             readWriteTx.write(nodePath, nodeToWrite );
175
176             // 3. Read the data from Tx
177
178             Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS);
179             assertEquals("exists", true, exists);
180
181             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
182             assertEquals("isPresent", true, optional.isPresent());
183             assertEquals("Data node", nodeToWrite, optional.get());
184
185             // 4. Ready the Tx for commit
186
187             DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
188
189             // 5. Commit the Tx
190
191             doCommit(cohort);
192
193             // 6. Verify the data in the store
194
195             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
196
197             optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
198             assertEquals("isPresent", true, optional.isPresent());
199             assertEquals("Data node", nodeToWrite, optional.get());
200
201             cleanup(dataStore);
202         }};
203     }
204
205     @Test
206     public void testReadWriteTransactionWithMultipleShards() throws Exception{
207         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
208             DistributedDataStore dataStore =
209                     setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
210
211             DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
212             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
213
214             readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
215             readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
216
217             doCommit(readWriteTx.ready());
218
219             readWriteTx = dataStore.newReadWriteTransaction();
220
221             readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
222             readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
223
224             doCommit(readWriteTx.ready());
225
226             readWriteTx = dataStore.newReadWriteTransaction();
227
228             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
229             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
230             readWriteTx.write(carPath, car);
231
232             MapEntryNode person = PeopleModel.newPersonEntry("jack");
233             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
234             readWriteTx.write(personPath, person);
235
236             Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
237             assertEquals("exists", true, exists);
238
239             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
240             assertEquals("isPresent", true, optional.isPresent());
241             assertEquals("Data node", car, optional.get());
242
243             doCommit(readWriteTx.ready());
244
245             // Verify the data in the store
246
247             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
248
249             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
250             assertEquals("isPresent", true, optional.isPresent());
251             assertEquals("Data node", car, optional.get());
252
253             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
254             assertEquals("isPresent", true, optional.isPresent());
255             assertEquals("Data node", person, optional.get());
256
257             cleanup(dataStore);
258         }};
259     }
260
261     @Test
262     public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
263         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
264             DistributedDataStore dataStore = setupDistributedDataStore(
265                     "testSingleTransactionsWritesInQuickSuccession", "cars-1");
266
267             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
268
269             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
270             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
271             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
272             doCommit(writeTx.ready());
273
274             writeTx = txChain.newWriteOnlyTransaction();
275
276             int nCars = 5;
277             for(int i = 0; i < nCars; i++) {
278                 writeTx.write(CarsModel.newCarPath("car" + i),
279                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
280             }
281
282             doCommit(writeTx.ready());
283
284             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
285                     CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
286             assertEquals("isPresent", true, optional.isPresent());
287             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
288
289             cleanup(dataStore);
290         }};
291     }
292
293     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
294             final boolean writeOnly) throws Exception {
295         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
296             String shardName = "test-1";
297
298             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
299             // initialized until we create and submit the write the Tx.
300             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
301             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
302             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
303
304             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
305
306             // Create the write Tx
307
308             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
309                     dataStore.newReadWriteTransaction();
310             assertNotNull("newReadWriteTransaction returned null", writeTx);
311
312             // Do some modification operations and ready the Tx on a separate thread.
313
314             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder(
315                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME,
316                             TestModel.ID_QNAME, 1).build();
317
318             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
319             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
320             final CountDownLatch txReady = new CountDownLatch(1);
321             Thread txThread = new Thread() {
322                 @Override
323                 public void run() {
324                     try {
325                         writeTx.write(TestModel.TEST_PATH,
326                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
327
328                         writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(
329                                 TestModel.OUTER_LIST_QNAME).build());
330
331                         writeTx.write(listEntryPath, ImmutableNodes.mapEntry(
332                                 TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
333
334                         writeTx.delete(listEntryPath);
335
336                         txCohort.set(writeTx.ready());
337                     } catch(Exception e) {
338                         caughtEx.set(e);
339                         return;
340                     } finally {
341                         txReady.countDown();
342                     }
343                 }
344             };
345
346             txThread.start();
347
348             // Wait for the Tx operations to complete.
349
350             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
351             if(caughtEx.get() != null) {
352                 throw caughtEx.get();
353             }
354
355             assertEquals("Tx ready", true, done);
356
357             // At this point the Tx operations should be waiting for the shard to initialize so
358             // trigger the latch to let the shard recovery to continue.
359
360             blockRecoveryLatch.countDown();
361
362             // Wait for the Tx commit to complete.
363
364             doCommit(txCohort.get());
365
366             // Verify the data in the store
367
368             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
369
370             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).
371                     get(5, TimeUnit.SECONDS);
372             assertEquals("isPresent", true, optional.isPresent());
373
374             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
375             assertEquals("isPresent", true, optional.isPresent());
376
377             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
378             assertEquals("isPresent", false, optional.isPresent());
379
380             cleanup(dataStore);
381         }};
382     }
383
384     @Test
385     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
386         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
387         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
388     }
389
390     @Test
391     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
392         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
393     }
394
395     @Test
396     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
397         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
398             String testName = "testTransactionReadsWithShardNotInitiallyReady";
399             String shardName = "test-1";
400
401             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
402             // initialized until we create the Tx.
403             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
404             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
405             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
406
407             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
408
409             // Create the read-write Tx
410
411             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
412             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
413
414             // Do some reads on the Tx on a separate thread.
415
416             final AtomicReference<CheckedFuture<Boolean, ReadFailedException>> txExistsFuture =
417                     new AtomicReference<>();
418             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
419                     txReadFuture = new AtomicReference<>();
420             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
421             final CountDownLatch txReadsDone = new CountDownLatch(1);
422             Thread txThread = new Thread() {
423                 @Override
424                 public void run() {
425                     try {
426                         readWriteTx.write(TestModel.TEST_PATH,
427                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
428
429                         txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
430
431                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
432                     } catch(Exception e) {
433                         caughtEx.set(e);
434                         return;
435                     } finally {
436                         txReadsDone.countDown();
437                     }
438                 }
439             };
440
441             txThread.start();
442
443             // Wait for the Tx operations to complete.
444
445             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
446             if(caughtEx.get() != null) {
447                 throw caughtEx.get();
448             }
449
450             assertEquals("Tx reads done", true, done);
451
452             // At this point the Tx operations should be waiting for the shard to initialize so
453             // trigger the latch to let the shard recovery to continue.
454
455             blockRecoveryLatch.countDown();
456
457             // Wait for the reads to complete and verify.
458
459             assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS));
460             assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent());
461
462             readWriteTx.close();
463
464             cleanup(dataStore);
465         }};
466     }
467
468     @Test(expected=NotInitializedException.class)
469     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
470         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
471             String testName = "testTransactionCommitFailureWithShardNotInitialized";
472             String shardName = "test-1";
473
474             // Set the shard initialization timeout low for the test.
475
476             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
477
478             // Setup the InMemoryJournal to block shard recovery indefinitely.
479
480             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
481             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
482             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
483
484             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
485
486             // Create the write Tx
487
488             final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
489             assertNotNull("newReadWriteTransaction returned null", writeTx);
490
491             // Do some modifications and ready the Tx on a separate thread.
492
493             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
494             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
495             final CountDownLatch txReady = new CountDownLatch(1);
496             Thread txThread = new Thread() {
497                 @Override
498                 public void run() {
499                     try {
500                         writeTx.write(TestModel.TEST_PATH,
501                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
502
503                         txCohort.set(writeTx.ready());
504                     } catch(Exception e) {
505                         caughtEx.set(e);
506                         return;
507                     } finally {
508                         txReady.countDown();
509                     }
510                 }
511             };
512
513             txThread.start();
514
515             // Wait for the Tx operations to complete.
516
517             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
518             if(caughtEx.get() != null) {
519                 throw caughtEx.get();
520             }
521
522             assertEquals("Tx ready", true, done);
523
524             // Wait for the commit to complete. Since the shard never initialized, the Tx should
525             // have timed out and throw an appropriate exception cause.
526
527             try {
528                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
529             } catch(ExecutionException e) {
530                 throw e.getCause();
531             } finally {
532                 blockRecoveryLatch.countDown();
533                 cleanup(dataStore);
534             }
535         }};
536     }
537
538     @Test(expected=NotInitializedException.class)
539     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
540         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
541             String testName = "testTransactionReadFailureWithShardNotInitialized";
542             String shardName = "test-1";
543
544             // Set the shard initialization timeout low for the test.
545
546             datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
547
548             // Setup the InMemoryJournal to block shard recovery indefinitely.
549
550             String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
551             CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
552             InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
553
554             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
555
556             // Create the read-write Tx
557
558             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
559             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
560
561             // Do a read on the Tx on a separate thread.
562
563             final AtomicReference<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>>
564                     txReadFuture = new AtomicReference<>();
565             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
566             final CountDownLatch txReadDone = new CountDownLatch(1);
567             Thread txThread = new Thread() {
568                 @Override
569                 public void run() {
570                     try {
571                         readWriteTx.write(TestModel.TEST_PATH,
572                                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
573
574                         txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
575
576                         readWriteTx.close();
577                     } catch(Exception e) {
578                         caughtEx.set(e);
579                         return;
580                     } finally {
581                         txReadDone.countDown();
582                     }
583                 }
584             };
585
586             txThread.start();
587
588             // Wait for the Tx operations to complete.
589
590             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
591             if(caughtEx.get() != null) {
592                 throw caughtEx.get();
593             }
594
595             assertEquals("Tx read done", true, done);
596
597             // Wait for the read to complete. Since the shard never initialized, the Tx should
598             // have timed out and throw an appropriate exception cause.
599
600             try {
601                 txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
602             } catch(ReadFailedException e) {
603                 throw e.getCause();
604             } finally {
605                 blockRecoveryLatch.countDown();
606                 cleanup(dataStore);
607             }
608         }};
609     }
610
611     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
612         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
613             String testName = "testTransactionCommitFailureWithNoShardLeader";
614             String shardName = "default";
615
616             // We don't want the shard to become the leader so prevent shard election from completing
617             // by setting the election timeout, which is based on the heartbeat interval, really high.
618
619             datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000);
620
621             DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName);
622
623             Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(),
624                     new FindLocalShard(shardName, true));
625             assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
626
627             // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly.
628
629             datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
630             dataStore.onDatastoreContextUpdated(datastoreContextBuilder.build());
631
632             // Create the write Tx.
633
634             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() :
635                 dataStore.newReadWriteTransaction();
636             assertNotNull("newReadWriteTransaction returned null", writeTx);
637
638             // Do some modifications and ready the Tx on a separate thread.
639
640             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
641             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
642             final CountDownLatch txReady = new CountDownLatch(1);
643             Thread txThread = new Thread() {
644                 @Override
645                 public void run() {
646                     try {
647                         writeTx.write(TestModel.JUNK_PATH,
648                                 ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
649
650                         txCohort.set(writeTx.ready());
651                     } catch(Exception e) {
652                         caughtEx.set(e);
653                         return;
654                     } finally {
655                         txReady.countDown();
656                     }
657                 }
658             };
659
660             txThread.start();
661
662             // Wait for the Tx operations to complete.
663
664             boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
665             if(caughtEx.get() != null) {
666                 throw caughtEx.get();
667             }
668
669             assertEquals("Tx ready", true, done);
670
671             // Wait for the commit to complete. Since no shard leader was elected in time, the Tx
672             // should have timed out and throw an appropriate exception cause.
673
674             try {
675                 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
676             } catch(ExecutionException e) {
677                 throw e.getCause();
678             } finally {
679                 cleanup(dataStore);
680             }
681         }};
682     }
683
684     @Test(expected=NoShardLeaderException.class)
685     public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable {
686         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
687         testTransactionCommitFailureWithNoShardLeader(true);
688     }
689
690     @Test(expected=NoShardLeaderException.class)
691     public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable {
692         testTransactionCommitFailureWithNoShardLeader(false);
693     }
694
695     @Test
696     public void testTransactionAbort() throws Exception{
697         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
698             DistributedDataStore dataStore =
699                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
700
701             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
702             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
703
704             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
705
706             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
707
708             cohort.canCommit().get(5, TimeUnit.SECONDS);
709
710             cohort.abort().get(5, TimeUnit.SECONDS);
711
712             testWriteTransaction(dataStore, TestModel.TEST_PATH,
713                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
714
715             cleanup(dataStore);
716         }};
717     }
718
719     @Test
720     public void testTransactionChainWithSingleShard() throws Exception{
721         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
722             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
723
724             // 1. Create a Tx chain and write-only Tx
725
726             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
727
728             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
729             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
730
731             // 2. Write some data
732
733             NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
734             writeTx.write(TestModel.TEST_PATH, testNode);
735
736             // 3. Ready the Tx for commit
737
738             final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
739
740             // 4. Commit the Tx on another thread that first waits for the second read Tx.
741
742             final CountDownLatch continueCommit1 = new CountDownLatch(1);
743             final CountDownLatch commit1Done = new CountDownLatch(1);
744             final AtomicReference<Exception> commit1Error = new AtomicReference<>();
745             new Thread() {
746                 @Override
747                 public void run() {
748                     try {
749                         continueCommit1.await();
750                         doCommit(cohort1);
751                     } catch (Exception e) {
752                         commit1Error.set(e);
753                     } finally {
754                         commit1Done.countDown();
755                     }
756                 }
757             }.start();
758
759             // 5. Create a new read Tx from the chain to read and verify the data from the first
760             // Tx is visible after being readied.
761
762             DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
763             Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
764             assertEquals("isPresent", true, optional.isPresent());
765             assertEquals("Data node", testNode, optional.get());
766
767             // 6. Create a new RW Tx from the chain, write more data, and ready it
768
769             DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
770             MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
771             rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
772
773             DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
774
775             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
776             // verify it is visible.
777
778             readTx = txChain.newReadWriteTransaction();
779             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
780             assertEquals("isPresent", true, optional.isPresent());
781             assertEquals("Data node", outerNode, optional.get());
782
783             // 8. Wait for the 2 commits to complete and close the chain.
784
785             continueCommit1.countDown();
786             Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
787
788             if(commit1Error.get() != null) {
789                 throw commit1Error.get();
790             }
791
792             doCommit(cohort2);
793
794             txChain.close();
795
796             // 9. Create a new read Tx from the data store and verify committed data.
797
798             readTx = dataStore.newReadOnlyTransaction();
799             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
800             assertEquals("isPresent", true, optional.isPresent());
801             assertEquals("Data node", outerNode, optional.get());
802
803             cleanup(dataStore);
804         }};
805     }
806
807     @Test
808     public void testTransactionChainWithMultipleShards() throws Exception{
809         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
810             DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
811                     "cars-1", "people-1");
812
813             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
814
815             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
816             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
817
818             writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
819             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
820
821             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
822             writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
823
824             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
825
826             DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
827
828             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
829             YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
830             readWriteTx.write(carPath, car);
831
832             MapEntryNode person = PeopleModel.newPersonEntry("jack");
833             YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
834             readWriteTx.merge(personPath, person);
835
836             Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
837             assertEquals("isPresent", true, optional.isPresent());
838             assertEquals("Data node", car, optional.get());
839
840             optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
841             assertEquals("isPresent", true, optional.isPresent());
842             assertEquals("Data node", person, optional.get());
843
844             DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
845
846             writeTx = txChain.newWriteOnlyTransaction();
847
848             writeTx.delete(carPath);
849
850             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
851
852             ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
853             ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
854
855             doCommit(canCommit1, cohort1);
856             doCommit(canCommit2, cohort2);
857             doCommit(cohort3);
858
859             txChain.close();
860
861             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
862
863             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
864             assertEquals("isPresent", false, optional.isPresent());
865
866             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
867             assertEquals("isPresent", true, optional.isPresent());
868             assertEquals("Data node", person, optional.get());
869
870             cleanup(dataStore);
871         }};
872     }
873
874     @Test
875     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
876         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
877             DistributedDataStore dataStore = setupDistributedDataStore(
878                     "testCreateChainedTransactionsInQuickSuccession", "cars-1");
879
880             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
881                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
882                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
883
884             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
885             DOMTransactionChain txChain = broker.createTransactionChain(listener);
886
887             List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
888
889             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
890             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
891             writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
892             futures.add(writeTx.submit());
893
894             int nCars = 100;
895             for(int i = 0; i < nCars; i++) {
896                 DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
897
898                 rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
899                         CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
900
901                 futures.add(rwTx.submit());
902             }
903
904             for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
905                 f.checkedGet();
906             }
907
908             Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
909                     LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
910             assertEquals("isPresent", true, optional.isPresent());
911             assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
912
913             txChain.close();
914
915             broker.close();
916
917             cleanup(dataStore);
918         }};
919     }
920
921     @Test
922     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
923         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
924             DistributedDataStore dataStore = setupDistributedDataStore(
925                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
926
927             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
928
929             DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction();
930
931             rwTx1.ready();
932
933             DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction();
934
935             Optional<NormalizedNode<?, ?>> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
936             assertEquals("isPresent", false, optional.isPresent());
937
938             txChain.close();
939
940             cleanup(dataStore);
941         }};
942     }
943
944     @Test
945     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
946         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
947             DistributedDataStore dataStore = setupDistributedDataStore(
948                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
949
950             final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
951
952             DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
953             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
954
955             writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
956
957             // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
958             // readied.
959
960             assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
961         }};
962     }
963
964     @Test
965     public void testCreateChainedTransactionAfterClose() throws Throwable {
966         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
967             DistributedDataStore dataStore = setupDistributedDataStore(
968                     "testCreateChainedTransactionAfterClose", "test-1");
969
970             DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
971
972             txChain.close();
973
974             // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
975
976             assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
977         }};
978     }
979
980     @Test
981     public void testChainedTransactionFailureWithSingleShard() throws Exception{
982         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
983             DistributedDataStore dataStore = setupDistributedDataStore(
984                     "testChainedTransactionFailureWithSingleShard", "cars-1");
985
986             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
987                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
988                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
989
990             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
991             DOMTransactionChain txChain = broker.createTransactionChain(listener);
992
993             DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
994
995             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
996                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
997                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
998
999             rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1000
1001             try {
1002                 rwTx.submit().checkedGet(5, TimeUnit.SECONDS);
1003                 fail("Expected TransactionCommitFailedException");
1004             } catch (TransactionCommitFailedException e) {
1005                 // Expected
1006             }
1007
1008             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class));
1009
1010             txChain.close();
1011             broker.close();
1012             cleanup(dataStore);
1013         }};
1014     }
1015
1016     @Test
1017     public void testChainedTransactionFailureWithMultipleShards() throws Exception{
1018         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1019             DistributedDataStore dataStore = setupDistributedDataStore(
1020                     "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1");
1021
1022             ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
1023                     ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
1024                             LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
1025
1026             TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
1027             DOMTransactionChain txChain = broker.createTransactionChain(listener);
1028
1029             DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
1030
1031             writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
1032
1033             ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
1034                     new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
1035                         withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
1036
1037             // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
1038             // done for put for performance reasons.
1039             writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
1040
1041             try {
1042                 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
1043                 fail("Expected TransactionCommitFailedException");
1044             } catch (TransactionCommitFailedException e) {
1045                 // Expected
1046             }
1047
1048             verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
1049
1050             txChain.close();
1051             broker.close();
1052             cleanup(dataStore);
1053         }};
1054     }
1055
1056     @Test
1057     public void testChangeListenerRegistration() throws Exception{
1058         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
1059             DistributedDataStore dataStore =
1060                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
1061
1062             testWriteTransaction(dataStore, TestModel.TEST_PATH,
1063                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
1064
1065             MockDataChangeListener listener = new MockDataChangeListener(1);
1066
1067             ListenerRegistration<MockDataChangeListener>
1068                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
1069                             DataChangeScope.SUBTREE);
1070
1071             assertNotNull("registerChangeListener returned null", listenerReg);
1072
1073             // Wait for the initial notification
1074
1075             listener.waitForChangeEvents(TestModel.TEST_PATH);
1076
1077             listener.reset(2);
1078
1079             // Write 2 updates.
1080
1081             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
1082                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
1083
1084             YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1085                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
1086             testWriteTransaction(dataStore, listPath,
1087                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
1088
1089             // Wait for the 2 updates.
1090
1091             listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
1092
1093             listenerReg.close();
1094
1095             testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
1096                     nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
1097                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
1098
1099             listener.expectNoMoreChanges("Received unexpected change after close");
1100
1101             cleanup(dataStore);
1102         }};
1103     }
1104 }