From 567da9db91b014415e2a5a005879f21fe03ab129 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 22 Aug 2018 15:46:20 +0200 Subject: [PATCH] Eliminate IntegrationTestKit subclasses The kit can be used as a simple class, no need to subclass it. Change-Id: I098bb72411a662bf8670fd945dba517d6253dea4 Signed-off-by: Robert Varga --- .../DistributedDataStoreIntegrationTest.java | 1600 ++++++++--------- 1 file changed, 766 insertions(+), 834 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 2c70977bfc..4aa2357a82 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; @@ -130,284 +129,267 @@ public class DistributedDataStoreIntegrationTest { @Test public void testWriteTransactionWithSingleShard() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "transactionIntegrationTest", "test-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "transactionIntegrationTest", "test-1")) { - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - } - } - }; + testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .build()); + } } @Test public void testWriteTransactionWithMultipleShards() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - doCommit(writeTx.ready()); + testKit.doCommit(writeTx.ready()); - writeTx = dataStore.newWriteOnlyTransaction(); + writeTx = dataStore.newWriteOnlyTransaction(); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - doCommit(writeTx.ready()); + testKit.doCommit(writeTx.ready()); - writeTx = dataStore.newWriteOnlyTransaction(); + writeTx = dataStore.newWriteOnlyTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - writeTx.write(carPath, car); + final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + writeTx.write(carPath, car); - final MapEntryNode person = PeopleModel.newPersonEntry("jack"); - final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - writeTx.write(personPath, person); + final MapEntryNode person = PeopleModel.newPersonEntry("jack"); + final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + writeTx.write(personPath, person); - doCommit(writeTx.ready()); + testKit.doCommit(writeTx.ready()); - // Verify the data in the store - final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + // Verify the data in the store + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); - } - } - }; + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } } @Test public void testReadWriteTransactionWithSingleShard() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) { - // 1. Create a read-write Tx - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); + // 1. Create a read-write Tx + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - // 2. Write some data - final YangInstanceIdentifier nodePath = TestModel.TEST_PATH; - final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - readWriteTx.write(nodePath, nodeToWrite); + // 2. Write some data + final YangInstanceIdentifier nodePath = TestModel.TEST_PATH; + final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + readWriteTx.write(nodePath, nodeToWrite); - // 3. Read the data from Tx - final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("exists", true, exists); + // 3. Read the data from Tx + final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); - Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); + Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); - // 4. Ready the Tx for commit - final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); + // 4. Ready the Tx for commit + final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); - // 5. Commit the Tx - doCommit(cohort); + // 5. Commit the Tx + testKit.doCommit(cohort); - // 6. Verify the data in the store - final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + // 6. Verify the data in the store + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); - } - } - }; + optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); + } } @Test public void testReadWriteTransactionWithMultipleShards() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { - DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - doCommit(readWriteTx.ready()); + testKit.doCommit(readWriteTx.ready()); - readWriteTx = dataStore.newReadWriteTransaction(); + readWriteTx = dataStore.newReadWriteTransaction(); - readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - doCommit(readWriteTx.ready()); + testKit.doCommit(readWriteTx.ready()); - readWriteTx = dataStore.newReadWriteTransaction(); + readWriteTx = dataStore.newReadWriteTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - readWriteTx.write(carPath, car); + final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); - final MapEntryNode person = PeopleModel.newPersonEntry("jack"); - final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - readWriteTx.write(personPath, person); + final MapEntryNode person = PeopleModel.newPersonEntry("jack"); + final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.write(personPath, person); - final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS); - assertEquals("exists", true, exists); + final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - doCommit(readWriteTx.ready()); + testKit.doCommit(readWriteTx.ready()); - // Verify the data in the store - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + // Verify the data in the store + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); - - } - } - }; + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } } @Test public void testSingleTransactionsWritesInQuickSuccession() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - doCommit(writeTx.ready()); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + testKit.doCommit(writeTx.ready()); - writeTx = txChain.newWriteOnlyTransaction(); + writeTx = txChain.newWriteOnlyTransaction(); - int numCars = 5; - for (int i = 0; i < numCars; i++) { - writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - } + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + } - doCommit(writeTx.ready()); + testKit.doCommit(writeTx.ready()); - final Optional> optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); - } - } - }; + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } } @SuppressWarnings("checkstyle:IllegalCatch") private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly) throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - final String shardName = "test-1"; - - // Setup the InMemoryJournal to block shard recovery to ensure - // the shard isn't - // initialized until we create and submit the write the Tx. - final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); - InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, testName, false, shardName)) { - - // Create the write Tx - final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() - : dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", writeTx); - - // Do some modification operations and ready the Tx on a - // separate thread. - final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier - .builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); - - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - final Thread txThread = new Thread(() -> { - try { - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - writeTx.merge(TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - - writeTx.write(listEntryPath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - - writeTx.delete(listEntryPath); - - txCohort.set(writeTx.ready()); - } catch (Exception e) { - caughtEx.set(e); - } finally { - txReady.countDown(); - } - }); - - txThread.start(); - - // Wait for the Tx operations to complete. - final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if (caughtEx.get() != null) { - throw caughtEx.get(); - } - - assertEquals("Tx ready", true, done); + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + final String shardName = "test-1"; + + // Setup the InMemoryJournal to block shard recovery to ensure + // the shard isn't + // initialized until we create and submit the write the Tx. + final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, testName, false, shardName)) { + + // Create the write Tx + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() + : dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modification operations and ready the Tx on a + // separate thread. + final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier + .builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + final Thread txThread = new Thread(() -> { + try { + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - // At this point the Tx operations should be waiting for the - // shard to initialize so - // trigger the latch to let the shard recovery to continue. - blockRecoveryLatch.countDown(); + writeTx.merge(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .build()); - // Wait for the Tx commit to complete. - doCommit(txCohort.get()); + writeTx.write(listEntryPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - // Verify the data in the store - final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + writeTx.delete(listEntryPath); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + txCohort.set(writeTx.ready()); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReady.countDown(); + } + }); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + txThread.start(); - optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); - } + // Wait for the Tx operations to complete. + final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); } - }; + + assertEquals("Tx ready", true, done); + + // At this point the Tx operations should be waiting for the + // shard to initialize so + // trigger the latch to let the shard recovery to continue. + blockRecoveryLatch.countDown(); + + // Wait for the Tx commit to complete. + testKit.doCommit(txCohort.get()); + + // Verify the data in the store + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + + optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); + } } @Test @@ -424,95 +406,231 @@ public class DistributedDataStoreIntegrationTest { @Test @SuppressWarnings("checkstyle:IllegalCatch") public void testTransactionReadsWithShardNotInitiallyReady() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - final String testName = "testTransactionReadsWithShardNotInitiallyReady"; - final String shardName = "test-1"; - - // Setup the InMemoryJournal to block shard recovery to ensure - // the shard isn't - // initialized until we create the Tx. - final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); - InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, testName, false, shardName)) { - - // Create the read-write Tx - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); - - // Do some reads on the Tx on a separate thread. - final AtomicReference> txExistsFuture = new AtomicReference<>(); - final AtomicReference>>> - txReadFuture = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReadsDone = new CountDownLatch(1); - final Thread txThread = new Thread(() -> { - try { - readWriteTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); - - txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); - } catch (Exception e) { - caughtEx.set(e); - } finally { - txReadsDone.countDown(); - } - }); - - txThread.start(); - - // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS); - if (caughtEx.get() != null) { - throw caughtEx.get(); - } + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + final String testName = "testTransactionReadsWithShardNotInitiallyReady"; + final String shardName = "test-1"; + + // Setup the InMemoryJournal to block shard recovery to ensure + // the shard isn't + // initialized until we create the Tx. + final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, testName, false, shardName)) { + + // Create the read-write Tx + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + // Do some reads on the Tx on a separate thread. + final AtomicReference> txExistsFuture = new AtomicReference<>(); + final AtomicReference>>> txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadsDone = new CountDownLatch(1); + final Thread txThread = new Thread(() -> { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - assertEquals("Tx reads done", true, done); + txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); - // At this point the Tx operations should be waiting for the - // shard to initialize so - // trigger the latch to let the shard recovery to continue. - blockRecoveryLatch.countDown(); + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReadsDone.countDown(); + } + }); - // Wait for the reads to complete and verify. - assertEquals("exists", true, txExistsFuture.get().get(5, TimeUnit.SECONDS)); - assertEquals("read", true, txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent()); + txThread.start(); - readWriteTx.close(); - } + // Wait for the Tx operations to complete. + boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); } - }; + + assertEquals("Tx reads done", true, done); + + // At this point the Tx operations should be waiting for the + // shard to initialize so + // trigger the latch to let the shard recovery to continue. + blockRecoveryLatch.countDown(); + + // Wait for the reads to complete and verify. + assertEquals("exists", true, txExistsFuture.get().get(5, TimeUnit.SECONDS)); + assertEquals("read", true, txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent()); + + readWriteTx.close(); + } } @Test(expected = NotInitializedException.class) @SuppressWarnings("checkstyle:IllegalCatch") public void testTransactionCommitFailureWithShardNotInitialized() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - final String testName = "testTransactionCommitFailureWithShardNotInitialized"; - final String shardName = "test-1"; + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + final String testName = "testTransactionCommitFailureWithShardNotInitialized"; + final String shardName = "test-1"; + + // Set the shard initialization timeout low for the test. + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + + // Setup the InMemoryJournal to block shard recovery + // indefinitely. + final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); + + final AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName); + + // Create the write Tx + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modifications and ready the Tx on a separate + // thread. + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + final Thread txThread = new Thread(() -> { + try { + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txCohort.set(writeTx.ready()); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReady.countDown(); + } + }); - // Set the shard initialization timeout low for the test. - datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + txThread.start(); + + // Wait for the Tx operations to complete. + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx ready", true, done); + + // Wait for the commit to complete. Since the shard never + // initialized, the Tx should + // have timed out and throw an appropriate exception cause. + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + fail("Expected NotInitializedException"); + } catch (final Exception e) { + final Throwable root = Throwables.getRootCause(e); + Throwables.throwIfUnchecked(root); + throw new RuntimeException(root); + } finally { + blockRecoveryLatch.countDown(); + } + } - // Setup the InMemoryJournal to block shard recovery - // indefinitely. - final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); - InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + @Test(expected = NotInitializedException.class) + @SuppressWarnings("checkstyle:IllegalCatch") + public void testTransactionReadFailureWithShardNotInitialized() throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + final String testName = "testTransactionReadFailureWithShardNotInitialized"; + final String shardName = "test-1"; + + // Set the shard initialization timeout low for the test. + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + + // Setup the InMemoryJournal to block shard recovery + // indefinitely. + final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) { - final AbstractDataStore dataStore = - setupAbstractDataStore(testParameter, testName, false, shardName); + // Create the read-write Tx + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - // Create the write Tx - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + // Do a read on the Tx on a separate thread. + final AtomicReference>>> txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadDone = new CountDownLatch(1); + final Thread txThread = new Thread(() -> { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + + readWriteTx.close(); + } catch (Exception e) { + caughtEx.set(e); + } finally { + txReadDone.countDown(); + } + }); + + txThread.start(); + + // Wait for the Tx operations to complete. + boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS); + if (caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx read done", true, done); + + // Wait for the read to complete. Since the shard never + // initialized, the Tx should + // have timed out and throw an appropriate exception cause. + try { + txReadFuture.get().get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + assertTrue("Expected ReadFailedException cause: " + e.getCause(), + e.getCause() instanceof ReadFailedException); + final Throwable root = Throwables.getRootCause(e); + Throwables.throwIfUnchecked(root); + throw new RuntimeException(root); + } finally { + blockRecoveryLatch.countDown(); + } + } + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) + throws Exception { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + final String shardName = "default"; + + // We don't want the shard to become the leader so prevent shard + // elections. + datastoreContextBuilder.customRaftPolicyImplementation( + "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy"); + + // The ShardManager uses the election timeout for FindPrimary so + // reset it low so it will timeout quickly. + datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1) + .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2); + + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) { + + final Object result = dataStore.getActorContext().executeOperation( + dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true)); + assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); + + // Create the write Tx. + DOMStoreWriteTransaction writeTxToClose = null; + try { + writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction() + : dataStore.newReadWriteTransaction(); + final DOMStoreWriteTransaction writeTx = writeTxToClose; assertNotNull("newReadWriteTransaction returned null", writeTx); // Do some modifications and ready the Tx on a separate @@ -522,7 +640,8 @@ public class DistributedDataStoreIntegrationTest { final CountDownLatch txReady = new CountDownLatch(1); final Thread txThread = new Thread(() -> { try { - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(TestModel.JUNK_PATH, + ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); txCohort.set(writeTx.ready()); } catch (Exception e) { @@ -542,187 +661,33 @@ public class DistributedDataStoreIntegrationTest { assertEquals("Tx ready", true, done); - // Wait for the commit to complete. Since the shard never - // initialized, the Tx should - // have timed out and throw an appropriate exception cause. + // Wait for the commit to complete. Since no shard + // leader was elected in time, the Tx + // should have timed out and throw an appropriate + // exception cause. try { - txCohort.get().canCommit().get(5, TimeUnit.SECONDS); - fail("Expected NotInitializedException"); - } catch (final Exception e) { - final Throwable root = Throwables.getRootCause(e); - Throwables.throwIfUnchecked(root); - throw new RuntimeException(root); - } finally { - blockRecoveryLatch.countDown(); - } - } - }; - } - - @Test(expected = NotInitializedException.class) - @SuppressWarnings("checkstyle:IllegalCatch") - public void testTransactionReadFailureWithShardNotInitialized() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - final String testName = "testTransactionReadFailureWithShardNotInitialized"; - final String shardName = "test-1"; - - // Set the shard initialization timeout low for the test. - datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); - - // Setup the InMemoryJournal to block shard recovery - // indefinitely. - final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); - final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); - InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - - InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, testName, false, shardName)) { - - // Create the read-write Tx - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); - - // Do a read on the Tx on a separate thread. - final AtomicReference>>> - txReadFuture = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReadDone = new CountDownLatch(1); - final Thread txThread = new Thread(() -> { - try { - readWriteTx.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); - - readWriteTx.close(); - } catch (Exception e) { - caughtEx.set(e); - } finally { - txReadDone.countDown(); - } - }); - - txThread.start(); - - // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS); - if (caughtEx.get() != null) { - throw caughtEx.get(); - } - - assertEquals("Tx read done", true, done); - - // Wait for the read to complete. Since the shard never - // initialized, the Tx should - // have timed out and throw an appropriate exception cause. - try { - txReadFuture.get().get(5, TimeUnit.SECONDS); - } catch (ExecutionException e) { - assertTrue("Expected ReadFailedException cause: " + e.getCause(), - e.getCause() instanceof ReadFailedException); - final Throwable root = Throwables.getRootCause(e); - Throwables.throwIfUnchecked(root); - throw new RuntimeException(root); - } finally { - blockRecoveryLatch.countDown(); + txCohort.get().canCommit().get(10, TimeUnit.SECONDS); + fail("Expected NoShardLeaderException"); + } catch (final ExecutionException e) { + final String msg = "Unexpected exception: " + + Throwables.getStackTraceAsString(e.getCause()); + if (DistributedDataStore.class.equals(testParameter)) { + assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException); + } else { + assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); } } - } - }; - } - - @SuppressWarnings("checkstyle:IllegalCatch") - private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) - throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - final String shardName = "default"; - - // We don't want the shard to become the leader so prevent shard - // elections. - datastoreContextBuilder.customRaftPolicyImplementation( - "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy"); - - // The ShardManager uses the election timeout for FindPrimary so - // reset it low so it will timeout quickly. - datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1) - .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2); - - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, testName, false, shardName)) { - - final Object result = dataStore.getActorContext().executeOperation( - dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true)); - assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); - - // Create the write Tx. - DOMStoreWriteTransaction writeTxToClose = null; - try { - writeTxToClose = writeOnly ? dataStore.newWriteOnlyTransaction() - : dataStore.newReadWriteTransaction(); - final DOMStoreWriteTransaction writeTx = writeTxToClose; - assertNotNull("newReadWriteTransaction returned null", writeTx); - - // Do some modifications and ready the Tx on a separate - // thread. - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - final Thread txThread = new Thread(() -> { - try { - writeTx.write(TestModel.JUNK_PATH, - ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); - - txCohort.set(writeTx.ready()); - } catch (Exception e) { - caughtEx.set(e); - } finally { - txReady.countDown(); - } - }); - - txThread.start(); - - // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if (caughtEx.get() != null) { - throw caughtEx.get(); - } - - assertEquals("Tx ready", true, done); - - // Wait for the commit to complete. Since no shard - // leader was elected in time, the Tx - // should have timed out and throw an appropriate - // exception cause. - try { - txCohort.get().canCommit().get(10, TimeUnit.SECONDS); - fail("Expected NoShardLeaderException"); - } catch (final ExecutionException e) { - final String msg = "Unexpected exception: " - + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { - assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException); - } else { - assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); - } - } - } finally { - try { - if (writeTxToClose != null) { - writeTxToClose.close(); - } - } catch (Exception e) { - // FIXME TransactionProxy.close throws IllegalStateException: - // Transaction is ready, it cannot be closed - } + } finally { + try { + if (writeTxToClose != null) { + writeTxToClose.close(); } + } catch (Exception e) { + // FIXME TransactionProxy.close throws IllegalStateException: + // Transaction is ready, it cannot be closed } } - }; + } } @Test @@ -738,533 +703,500 @@ public class DistributedDataStoreIntegrationTest { @Test public void testTransactionAbort() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "transactionAbortIntegrationTest", "test-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "transactionAbortIntegrationTest", "test-1")) { - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - cohort.canCommit().get(5, TimeUnit.SECONDS); + cohort.canCommit().get(5, TimeUnit.SECONDS); - cohort.abort().get(5, TimeUnit.SECONDS); + cohort.abort().get(5, TimeUnit.SECONDS); - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - } - } - }; + testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } } @Test @SuppressWarnings("checkstyle:IllegalCatch") public void testTransactionChainWithSingleShard() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testTransactionChainWithSingleShard", "test-1")) { - - // 1. Create a Tx chain and write-only Tx - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - - final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); - - // 2. Write some data - final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - writeTx.write(TestModel.TEST_PATH, testNode); - - // 3. Ready the Tx for commit - final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - - // 4. Commit the Tx on another thread that first waits for - // the second read Tx. - final CountDownLatch continueCommit1 = new CountDownLatch(1); - final CountDownLatch commit1Done = new CountDownLatch(1); - final AtomicReference commit1Error = new AtomicReference<>(); - new Thread(() -> { - try { - continueCommit1.await(); - doCommit(cohort1); - } catch (Exception e) { - commit1Error.set(e); - } finally { - commit1Done.countDown(); - } - }).start(); - - // 5. Create a new read Tx from the chain to read and verify - // the data from the first - // Tx is visible after being readied. - DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", testNode, optional.get()); - - // 6. Create a new RW Tx from the chain, write more data, - // and ready it - final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); - rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); - - final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); - - // 7. Create a new read Tx from the chain to read the data - // from the last RW Tx to - // verify it is visible. - readTx = txChain.newReadWriteTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); - - // 8. Wait for the 2 commits to complete and close the - // chain. - continueCommit1.countDown(); - Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); - - if (commit1Error.get() != null) { - throw commit1Error.get(); - } + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testTransactionChainWithSingleShard", "test-1")) { + + // 1. Create a Tx chain and write-only Tx + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - doCommit(cohort2); + // 2. Write some data + final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); - txChain.close(); + // 3. Ready the Tx for commit + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - // 9. Create a new read Tx from the data store and verify - // committed data. - readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); + // 4. Commit the Tx on another thread that first waits for + // the second read Tx. + final CountDownLatch continueCommit1 = new CountDownLatch(1); + final CountDownLatch commit1Done = new CountDownLatch(1); + final AtomicReference commit1Error = new AtomicReference<>(); + new Thread(() -> { + try { + continueCommit1.await(); + testKit.doCommit(cohort1); + } catch (Exception e) { + commit1Error.set(e); + } finally { + commit1Done.countDown(); } + }).start(); + + // 5. Create a new read Tx from the chain to read and verify + // the data from the first + // Tx is visible after being readied. + DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); + Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", testNode, optional.get()); + + // 6. Create a new RW Tx from the chain, write more data, + // and ready it + final DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + final MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .build(); + rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); + + final DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); + + // 7. Create a new read Tx from the chain to read the data + // from the last RW Tx to + // verify it is visible. + readTx = txChain.newReadWriteTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + + // 8. Wait for the 2 commits to complete and close the + // chain. + continueCommit1.countDown(); + Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); + + if (commit1Error.get() != null) { + throw commit1Error.get(); } - }; + + testKit.doCommit(cohort2); + + txChain.close(); + + // 9. Create a new read Tx from the data store and verify + // committed data. + readTx = dataStore.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + } } @Test public void testTransactionChainWithMultipleShards() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); + final DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - readWriteTx.write(carPath, car); + final MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + final YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); - final MapEntryNode person = PeopleModel.newPersonEntry("jack"); - final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - readWriteTx.merge(personPath, person); + final MapEntryNode person = PeopleModel.newPersonEntry("jack"); + final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.merge(personPath, person); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); + optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); - final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); - writeTx = txChain.newWriteOnlyTransaction(); + writeTx = txChain.newWriteOnlyTransaction(); - writeTx.delete(carPath); + writeTx.delete(carPath); - final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); - final ListenableFuture canCommit1 = cohort1.canCommit(); - final ListenableFuture canCommit2 = cohort2.canCommit(); + final ListenableFuture canCommit1 = cohort1.canCommit(); + final ListenableFuture canCommit2 = cohort2.canCommit(); - doCommit(canCommit1, cohort1); - doCommit(canCommit2, cohort2); - doCommit(cohort3); + testKit.doCommit(canCommit1, cohort1); + testKit.doCommit(canCommit2, cohort2); + testKit.doCommit(cohort3); - txChain.close(); + txChain.close(); - final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); - } - } - }; + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } } @Test public void testCreateChainedTransactionsInQuickSuccession() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { - final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder() - .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), - MoreExecutors.directExecutor()); + final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder() + .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), + MoreExecutors.directExecutor()); - final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); - final List> futures = new ArrayList<>(); + final List> futures = new ArrayList<>(); - final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - futures.add(writeTx.commit()); + final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + futures.add(writeTx.commit()); - int numCars = 100; - for (int i = 0; i < numCars; i++) { - final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + int numCars = 100; + for (int i = 0; i < numCars; i++) { + final DOMDataTreeReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - futures.add(rwTx.commit()); - } + futures.add(rwTx.commit()); + } - for (final ListenableFuture f : futures) { - f.get(5, TimeUnit.SECONDS); - } + for (final ListenableFuture f : futures) { + f.get(5, TimeUnit.SECONDS); + } - final Optional> optional = txChain.newReadOnlyTransaction() - .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + final Optional> optional = txChain.newReadOnlyTransaction() + .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); - txChain.close(); + txChain.close(); - broker.close(); - } - } - }; + broker.close(); + } } @Test public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); + final DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); - rwTx1.ready(); + rwTx1.ready(); - final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); + final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); - final Optional> optional = rwTx2.read(TestModel.TEST_PATH).get( - 5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); + final Optional> optional = rwTx2.read(TestModel.TEST_PATH).get( + 5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); - txChain.close(); - } - } - }; + txChain.close(); + } } @Test public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - // Try to create another Tx of each type - each should fail - // b/c the previous Tx wasn't - // readied. - assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); - } - } - }; + // Try to create another Tx of each type - each should fail + // b/c the previous Tx wasn't + // readied. + testKit.assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); + } } @Test public void testCreateChainedTransactionAfterClose() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testCreateChainedTransactionAfterClose", "test-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testCreateChainedTransactionAfterClose", "test-1")) { - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - txChain.close(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + txChain.close(); - // Try to create another Tx of each type - should fail b/c - // the previous Tx was closed. - assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); - } - } - }; + // Try to create another Tx of each type - should fail b/c + // the previous Tx was closed. + testKit.assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); + } } @Test public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - // Create a write tx and submit. - final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + // Create a write tx and submit. + final DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - // Create read-only tx's and issue a read. - FluentFuture>> readFuture1 = txChain - .newReadOnlyTransaction().read(TestModel.TEST_PATH); + // Create read-only tx's and issue a read. + FluentFuture>> readFuture1 = txChain + .newReadOnlyTransaction().read(TestModel.TEST_PATH); - FluentFuture>> readFuture2 = txChain - .newReadOnlyTransaction().read(TestModel.TEST_PATH); + FluentFuture>> readFuture2 = txChain + .newReadOnlyTransaction().read(TestModel.TEST_PATH); - // Create another write tx and issue the write. - DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); - writeTx2.write(TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + // Create another write tx and issue the write. + DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); + writeTx2.write(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .build()); - // Ensure the reads succeed. + // Ensure the reads succeed. - assertEquals("isPresent", true, readFuture1.get(5, TimeUnit.SECONDS).isPresent()); - assertEquals("isPresent", true, readFuture2.get(5, TimeUnit.SECONDS).isPresent()); + assertEquals("isPresent", true, readFuture1.get(5, TimeUnit.SECONDS).isPresent()); + assertEquals("isPresent", true, readFuture2.get(5, TimeUnit.SECONDS).isPresent()); - // Ensure the writes succeed. - DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); + // Ensure the writes succeed. + DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); - doCommit(cohort1); - doCommit(cohort2); + testKit.doCommit(cohort1); + testKit.doCommit(cohort2); - assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH) - .get(5, TimeUnit.SECONDS).isPresent()); - } - } - }; + assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH) + .get(5, TimeUnit.SECONDS).isPresent()); + } } @Test public void testChainedTransactionFailureWithSingleShard() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) { - final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder() - .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), - MoreExecutors.directExecutor()); + final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder() + .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), + MoreExecutors.directExecutor()); - final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - final DOMTransactionChain txChain = broker.createTransactionChain(listener); + final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChain txChain = broker.createTransactionChain(listener); - final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); + final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); - writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, - PeopleModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, + PeopleModel.emptyContainer()); - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + try { + writeTx.commit().get(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (final ExecutionException e) { + // Expected + } - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), - any(Throwable.class)); + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), + any(Throwable.class)); - txChain.close(); - broker.close(); - } - } - }; + txChain.close(); + broker.close(); + } } @Test public void testChainedTransactionFailureWithMultipleShards() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { - final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( - ImmutableMap.builder() - .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), - MoreExecutors.directExecutor()); + final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder() + .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), + MoreExecutors.directExecutor()); - final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - final DOMTransactionChain txChain = broker.createTransactionChain(listener); + final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + final DOMTransactionChain txChain = broker.createTransactionChain(listener); - final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction(); + final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction(); - writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, - PeopleModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, + PeopleModel.emptyContainer()); - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - // Note that merge will validate the data and fail but put - // succeeds b/c deep validation is not - // done for put for performance reasons. - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + // Note that merge will validate the data and fail but put + // succeeds b/c deep validation is not + // done for put for performance reasons. + try { + writeTx.commit().get(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (final ExecutionException e) { + // Expected + } - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), - any(Throwable.class)); + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), + any(Throwable.class)); - txChain.close(); - broker.close(); - } - } - }; + txChain.close(); + broker.close(); + } } @Test public void testDataTreeChangeListenerRegistration() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, "testDataTreeChangeListenerRegistration", "test-1")) { + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, "testDataTreeChangeListenerRegistration", "test-1")) { - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - ListenerRegistration listenerReg = dataStore - .registerTreeChangeListener(TestModel.TEST_PATH, listener); + ListenerRegistration listenerReg = dataStore + .registerTreeChangeListener(TestModel.TEST_PATH, listener); - assertNotNull("registerTreeChangeListener returned null", listenerReg); + assertNotNull("registerTreeChangeListener returned null", listenerReg); - IntegrationTestKit.verifyShardState(dataStore, "test-1", - state -> assertEquals("getTreeChangeListenerActors", 1, - state.getTreeChangeListenerActors().size())); + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 1, + state.getTreeChangeListenerActors().size())); - // Wait for the initial notification - listener.waitForChangeEvents(TestModel.TEST_PATH); - listener.reset(2); + // Wait for the initial notification + listener.waitForChangeEvents(TestModel.TEST_PATH); + listener.reset(2); - // Write 2 updates. - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, - ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + // Write 2 updates. + testKit.testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME) + .build()); - YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); - testWriteTransaction(dataStore, listPath, - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + testKit.testWriteTransaction(dataStore, listPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - // Wait for the 2 updates. - listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); - listenerReg.close(); + // Wait for the 2 updates. + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); + listenerReg.close(); - IntegrationTestKit.verifyShardState(dataStore, "test-1", - state -> assertEquals("getTreeChangeListenerActors", 0, - state.getTreeChangeListenerActors().size())); + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 0, + state.getTreeChangeListenerActors().size())); - testWriteTransaction(dataStore, - YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) - .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), - ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); + testKit.testWriteTransaction(dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); - listener.expectNoMoreChanges("Received unexpected change after close"); - } - } - }; + listener.expectNoMoreChanges("Received unexpected change after close"); + } } @Test public void testRestoreFromDatastoreSnapshot() throws Exception { - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { - { - final String name = "transactionIntegrationTest"; + final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + final String name = "transactionIntegrationTest"; - final ContainerNode carsNode = CarsModel.newCarsNode( - CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), - CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); + final ContainerNode carsNode = CarsModel.newCarsNode( + CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), + CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); - DataTree dataTree = new InMemoryDataTreeFactory().create( - DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); - AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); - NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + DataTree dataTree = new InMemoryDataTreeFactory().create( + DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); + AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); + NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - final Snapshot carsSnapshot = Snapshot.create( - new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), - Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + final Snapshot carsSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); - dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, - SchemaContextHelper.full()); + dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, + SchemaContextHelper.full()); - final NormalizedNode peopleNode = PeopleModel.create(); - AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); + final NormalizedNode peopleNode = PeopleModel.create(); + AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); - root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); + root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.EMPTY); - final Snapshot peopleSnapshot = Snapshot.create( - new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), - Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); + final Snapshot peopleSnapshot = Snapshot.create( + new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1", null); - restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList( - new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot), - new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot))); + testKit.restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList( + new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot), + new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot))); - try (AbstractDataStore dataStore = setupAbstractDataStore( - testParameter, name, "module-shards-member1.conf", true, "cars", "people")) { + try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( + testParameter, name, "module-shards-member1.conf", true, "cars", "people")) { - final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - // two reads - Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", carsNode, optional.get()); + // two reads + Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", carsNode, optional.get()); - optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", peopleNode, optional.get()); - } - } - }; + optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", peopleNode, optional.get()); + } } } -- 2.36.6