- new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
- {
- try (DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard",
- "test-1")) {
-
- // 1. Create a Tx chain and write-only Tx
-
- DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
-
- DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
- assertNotNull("newWriteOnlyTransaction returned null", writeTx);
-
- // 2. Write some data
-
- NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- writeTx.write(TestModel.TEST_PATH, testNode);
-
- // 3. Ready the Tx for commit
-
- final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
-
- // 4. Commit the Tx on another thread that first waits for
- // the second read Tx.
-
- final CountDownLatch continueCommit1 = new CountDownLatch(1);
- final CountDownLatch commit1Done = new CountDownLatch(1);
- final AtomicReference<Exception> commit1Error = new AtomicReference<>();
- new Thread() {
- @Override
- public void run() {
- try {
- continueCommit1.await();
- doCommit(cohort1);
- } catch (Exception e) {
- commit1Error.set(e);
- } finally {
- commit1Done.countDown();
- }
- }
- }.start();
-
- // 5. Create a new read Tx from the chain to read and verify
- // the data from the first
- // Tx is visible after being readied.
-
- DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
- Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", testNode, optional.get());
-
- // 6. Create a new RW Tx from the chain, write more data,
- // and ready it
-
- DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
- MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
- rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
-
- final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
-
- // 7. Create a new read Tx from the chain to read the data
- // from the last RW Tx to
- // verify it is visible.
-
- readTx = txChain.newReadWriteTransaction();
- optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
- assertEquals("isPresent", true, optional.isPresent());
- assertEquals("Data node", outerNode, optional.get());
-
- // 8. Wait for the 2 commits to complete and close the
- // chain.
-
- continueCommit1.countDown();
- Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
-
- if (commit1Error.get() != null) {
- throw commit1Error.get();
- }
-
- doCommit(cohort2);
+ final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+ try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
+ testParameter, "testTransactionChainWithSingleShard", "test-1")) {
+
+ // 1. Create a Tx chain and write-only Tx
+ final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+ final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+ // 2. Write some data
+ final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ writeTx.write(TestModel.TEST_PATH, testNode);
+
+ // 3. Ready the Tx for commit
+ final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
+
+ // 4. Commit the Tx on another thread that first waits for
+ // the second read Tx.
+ final CountDownLatch continueCommit1 = new CountDownLatch(1);
+ final CountDownLatch commit1Done = new CountDownLatch(1);
+ final AtomicReference<Exception> commit1Error = new AtomicReference<>();
+ new Thread(() -> {
+ try {
+ continueCommit1.await();
+ testKit.doCommit(cohort1);
+ } catch (Exception e) {
+ commit1Error.set(e);
+ } finally {
+ commit1Done.countDown();
+ }
+ }).start();
+
+ // 5. Create a new read Tx from the chain to read and verify
+ // the data from the first
+ // Tx is visible after being readied.
+ DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
+ Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("Data node", testNode, optional.get());
+
+ // 6. Create a new RW Tx from the chain, write more data,
+ // and ready it
+ final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+ final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+ .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
+ .build();
+ rwTx.write(TestModel.OUTER_LIST_PATH, outerNode);
+
+ final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready();
+
+ // 7. Create a new read Tx from the chain to read the data
+ // from the last RW Tx to
+ // verify it is visible.
+ readTx = txChain.newReadWriteTransaction();
+ optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("Data node", outerNode, optional.get());
+
+ // 8. Wait for the 2 commits to complete and close the
+ // chain.
+ continueCommit1.countDown();
+ Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS);
+
+ if (commit1Error.get() != null) {
+ throw commit1Error.get();
+ }