+ }
+ }.start();
+
+ // 5. Create a new read Tx from the chain to read and verify the data from the first
+ // Tx is visible after being readied.
+
+ DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
+ Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", testNode, optional.get());
+
+ // 6. Create a new RW Tx from the chain, write more data, and ready it
+
+ DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+ MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+ rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
+
+ DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
+
+ // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
+ // verify it is visible.
+
+ readTx = txChain.newReadWriteTransaction();
+ optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", outerNode, optional.get());
+
+ // 8. Wait for the 2 commits to complete and close the chain.
+
+ continueCommit1.countDown();
+ Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
+
+ if(commit1Error.get() != null) {
+ throw commit1Error.get();
+ }
+
+ doCommit(cohort2);
+
+ txChain.close();
+
+ // 9. Create a new read Tx from the data store and verify committed data.
+
+ readTx = dataStore.newReadOnlyTransaction();
+ optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", outerNode, optional.get());
+
+ cleanup(dataStore);
+ }};
+ }
+
+ @Test
+ public void testTransactionChainWithMultipleShards() throws Exception{
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+ DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
+ "cars-1", "people-1");
+
+ DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
+
+ DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
+
+ DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
+
+ MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+ YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+ readWriteTx.write(carPath, car);
+
+ MapEntryNode person = PeopleModel.newPersonEntry("jack");
+ YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
+ readWriteTx.merge(personPath, person);
+
+ Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", car, optional.get());
+
+ optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", person, optional.get());
+
+ DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
+
+ writeTx = txChain.newWriteOnlyTransaction();
+
+ //writeTx.delete(personPath);
+
+ DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
+
+ doCommit(cohort1);
+ doCommit(cohort2);
+ doCommit(cohort3);
+
+ txChain.close();
+
+ DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+ optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("Data node", car, optional.get());
+
+ optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
+ //assertEquals("isPresent", false, optional.isPresent());
+ assertEquals("isPresent", true, optional.isPresent());
+
+ cleanup(dataStore);
+ }};
+ }
+
+ @Test
+ public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+ DistributedDataStore dataStore = setupDistributedDataStore(
+ "testCreateChainedTransactionsInQuickSuccession", "test-1");
+
+ DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+ NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ int nTxs = 20;
+ List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
+ for(int i = 0; i < nTxs; i++) {
+ DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+
+ rwTx.merge(TestModel.TEST_PATH, testNode);
+
+ cohorts.add(rwTx.ready());
+
+ }
+
+ for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
+ doCommit(cohort);