From 6f3c16acf17d0cb4d5f44b666751f8db84a652be Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 16 May 2016 13:38:09 +0200 Subject: [PATCH] BUG-5280: make sure all DistributedDataStore instances are shut down Fix up instantiation sites to eventually close the data store instance they have created. Change-Id: Ib71531c0263197209db6ec7de4f6c92d60db3d1d Signed-off-by: Robert Varga --- .../DataTreeCohortIntegrationTest.java | 89 +- .../DistributedDataStoreIntegrationTest.java | 1157 ++++++++--------- ...butedDataStoreRemotingIntegrationTest.java | 149 ++- .../cluster/datastore/IntegrationTestKit.java | 7 - .../cluster/datastore/MemberNode.java | 9 +- ...DistributedEntityOwnershipServiceTest.java | 3 +- 6 files changed, 704 insertions(+), 710 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java index 84521e5ce6..720cd01ad6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java @@ -14,7 +14,6 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; - import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; @@ -91,25 +90,25 @@ public class DataTreeCohortIntegrationTest { ArgumentCaptor candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class); new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - final DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1"); - final ObjectRegistration cohortReg = dataStore.registerCommitCohort(TEST_ID, cohort); - Thread.sleep(1000); // Registration is asynchronous - assertNotNull(cohortReg); - testWriteTransaction(dataStore, TestModel.TEST_PATH, + try (final DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest", "test-1")) { + final ObjectRegistration cohortReg = dataStore.registerCommitCohort(TEST_ID, cohort); + Thread.sleep(1000); // Registration is asynchronous + assertNotNull(cohortReg); + testWriteTransaction(dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - Mockito.verify(cohort).canCommit(any(Object.class), candidateCapt.capture(), any(SchemaContext.class)); - DOMDataTreeCandidate candidate = candidateCapt.getValue(); - assertNotNull(candidate); - assertEquals(TEST_ID, candidate.getRootPath()); - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + Mockito.verify(cohort).canCommit(any(Object.class), candidateCapt.capture(), any(SchemaContext.class)); + DOMDataTreeCandidate candidate = candidateCapt.getValue(); + assertNotNull(candidate); + assertEquals(TEST_ID, candidate.getRootPath()); + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - Mockito.verify(cohort, Mockito.times(2)).canCommit(any(Object.class), any(DOMDataTreeCandidate.class), + Mockito.verify(cohort, Mockito.times(2)).canCommit(any(Object.class), any(DOMDataTreeCandidate.class), any(SchemaContext.class)); - cohortReg.close(); - testWriteTransaction(dataStore, TestModel.TEST_PATH, + cohortReg.close(); + testWriteTransaction(dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - Mockito.verifyNoMoreInteractions(cohort); - cleanup(dataStore); + Mockito.verifyNoMoreInteractions(cohort); + } } }; } @@ -123,23 +122,23 @@ public class DataTreeCohortIntegrationTest { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - final DistributedDataStore dataStore = - setupDistributedDataStore("transactionIntegrationTest", "test-1"); - dataStore.registerCommitCohort(TEST_ID, failedCohort); - Thread.sleep(1000); // Registration is asynchronous - - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); - try { - // FIXME: Weird thing is that invoking canCommit on front-end invokes also - // preCommit on backend. - dsCohort.canCommit().get(); - fail("Exception should be raised."); - } catch (Exception e) { - assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e)); + try (final DistributedDataStore dataStore = + setupDistributedDataStore("transactionIntegrationTest", "test-1")) { + dataStore.registerCommitCohort(TEST_ID, failedCohort); + Thread.sleep(1000); // Registration is asynchronous + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); + try { + // FIXME: Weird thing is that invoking canCommit on front-end invokes also + // preCommit on backend. + dsCohort.canCommit().get(); + fail("Exception should be raised."); + } catch (Exception e) { + assertSame(FAILED_CAN_COMMIT, Throwables.getRootCause(e)); + } } - cleanup(dataStore); } }; } @@ -160,19 +159,19 @@ public class DataTreeCohortIntegrationTest { Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort(); new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - final DistributedDataStore dataStore = - setupDistributedDataStore("transactionIntegrationTest", "test-1"); - dataStore.registerCommitCohort(TEST_ID, cohortToAbort); - Thread.sleep(1000); // Registration is asynchronous - - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); - - dsCohort.canCommit().get(); - dsCohort.abort().get(); - Mockito.verify(stepToAbort, Mockito.times(1)).abort(); - cleanup(dataStore); + try (final DistributedDataStore dataStore = + setupDistributedDataStore("transactionIntegrationTest", "test-1")) { + dataStore.registerCommitCohort(TEST_ID, cohortToAbort); + Thread.sleep(1000); // Registration is asynchronous + + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreThreePhaseCommitCohort dsCohort = writeTx.ready(); + + dsCohort.canCommit().get(); + dsCohort.abort().get(); + Mockito.verify(stepToAbort, Mockito.times(1)).abort(); + } } }; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index db8e2c29d9..9d86b8d9c9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -110,65 +110,63 @@ public class DistributedDataStoreIntegrationTest { @Test public void testWriteTransactionWithSingleShard() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("transactionIntegrationTest", "test-1"); + try (DistributedDataStore dataStore = + setupDistributedDataStore("transactionIntegrationTest", "test-1")) { - testWriteTransaction(dataStore, TestModel.TEST_PATH, + testWriteTransaction(dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - - cleanup(dataStore); + } }}; } @Test public void testWriteTransactionWithMultipleShards() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1"); - - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + try (DistributedDataStore dataStore = + setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - doCommit(writeTx.ready()); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - writeTx = dataStore.newWriteOnlyTransaction(); + doCommit(writeTx.ready()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + writeTx = dataStore.newWriteOnlyTransaction(); - doCommit(writeTx.ready()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - writeTx = dataStore.newWriteOnlyTransaction(); + doCommit(writeTx.ready()); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - writeTx.write(carPath, car); + writeTx = dataStore.newWriteOnlyTransaction(); - MapEntryNode person = PeopleModel.newPersonEntry("jack"); - YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - writeTx.write(personPath, person); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + writeTx.write(carPath, car); - doCommit(writeTx.ready()); + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + writeTx.write(personPath, person); - // Verify the data in the store + doCommit(writeTx.ready()); - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + // Verify the data in the store - Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); + Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - cleanup(dataStore); + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } }}; } @@ -176,134 +174,132 @@ public class DistributedDataStoreIntegrationTest { public void testReadWriteTransactionWithSingleShard() throws Exception{ System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1"); - - // 1. Create a read-write Tx + try (DistributedDataStore dataStore = + setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1")) { - DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); + // 1. Create a read-write Tx - // 2. Write some data + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - YangInstanceIdentifier nodePath = TestModel.TEST_PATH; - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - readWriteTx.write(nodePath, nodeToWrite ); + // 2. Write some data - // 3. Read the data from Tx + YangInstanceIdentifier nodePath = TestModel.TEST_PATH; + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + readWriteTx.write(nodePath, nodeToWrite ); - Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS); - assertEquals("exists", true, exists); + // 3. Read the data from Tx - Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); + Boolean exists = readWriteTx.exists(nodePath).checkedGet(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); - // 4. Ready the Tx for commit + Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); - DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); + // 4. Ready the Tx for commit - // 5. Commit the Tx + DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); - doCommit(cohort); + // 5. Commit the Tx - // 6. Verify the data in the store + doCommit(cohort); - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + // 6. Verify the data in the store - optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - cleanup(dataStore); + optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", nodeToWrite, optional.get()); + } }}; } @Test public void testReadWriteTransactionWithMultipleShards() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1"); + try (DistributedDataStore dataStore = + setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { - DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); + DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - doCommit(readWriteTx.ready()); + doCommit(readWriteTx.ready()); - readWriteTx = dataStore.newReadWriteTransaction(); + readWriteTx = dataStore.newReadWriteTransaction(); - readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - doCommit(readWriteTx.ready()); + doCommit(readWriteTx.ready()); - readWriteTx = dataStore.newReadWriteTransaction(); + readWriteTx = dataStore.newReadWriteTransaction(); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - readWriteTx.write(carPath, car); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); - MapEntryNode person = PeopleModel.newPersonEntry("jack"); - YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - readWriteTx.write(personPath, person); + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.write(personPath, person); - Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); - assertEquals("exists", true, exists); + Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS); + assertEquals("exists", true, exists); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - doCommit(readWriteTx.ready()); + doCommit(readWriteTx.ready()); - // Verify the data in the store + // Verify the data in the store - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); - cleanup(dataStore); + } }}; } @Test public void testSingleTransactionsWritesInQuickSuccession() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testSingleTransactionsWritesInQuickSuccession", "cars-1"); + try (DistributedDataStore dataStore = setupDistributedDataStore( + "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - doCommit(writeTx.ready()); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + doCommit(writeTx.ready()); - writeTx = txChain.newWriteOnlyTransaction(); + writeTx = txChain.newWriteOnlyTransaction(); - int nCars = 5; - for(int i = 0; i < nCars; i++) { - writeTx.write(CarsModel.newCarPath("car" + i), + int nCars = 5; + for(int i = 0; i < nCars; i++) { + writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - } + } - doCommit(writeTx.ready()); + doCommit(writeTx.ready()); - Optional> optional = txChain.newReadOnlyTransaction().read( + Optional> optional = txChain.newReadOnlyTransaction().read( CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); - - cleanup(dataStore); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); + } }}; } @@ -318,83 +314,82 @@ public class DistributedDataStoreIntegrationTest { CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { - // Create the write Tx + // Create the write Tx - final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", writeTx); + assertNotNull("newReadWriteTransaction returned null", writeTx); - // Do some modification operations and ready the Tx on a separate thread. + // Do some modification operations and ready the Tx on a separate thread. - final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder( + final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder( TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, - TestModel.ID_QNAME, 1).build(); - - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - writeTx.write(TestModel.TEST_PATH, + TestModel.ID_QNAME, 1).build(); + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder( + writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder( TestModel.OUTER_LIST_QNAME).build()); - writeTx.write(listEntryPath, ImmutableNodes.mapEntry( + writeTx.write(listEntryPath, ImmutableNodes.mapEntry( TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - writeTx.delete(listEntryPath); + writeTx.delete(listEntryPath); - txCohort.set(writeTx.ready()); - } catch(Exception e) { - caughtEx.set(e); - return; - } finally { - txReady.countDown(); + txCohort.set(writeTx.ready()); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } } - } - }; + }; - txThread.start(); + txThread.start(); - // Wait for the Tx operations to complete. - - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); - } + // Wait for the Tx operations to complete. - assertEquals("Tx ready", true, done); + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } - // At this point the Tx operations should be waiting for the shard to initialize so - // trigger the latch to let the shard recovery to continue. + assertEquals("Tx ready", true, done); - blockRecoveryLatch.countDown(); + // At this point the Tx operations should be waiting for the shard to initialize so + // trigger the latch to let the shard recovery to continue. - // Wait for the Tx commit to complete. + blockRecoveryLatch.countDown(); - doCommit(txCohort.get()); + // Wait for the Tx commit to complete. - // Verify the data in the store + doCommit(txCohort.get()); - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + // Verify the data in the store - Optional> optional = readTx.read(TestModel.TEST_PATH). - get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); + Optional> optional = readTx.read(TestModel.TEST_PATH). + get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); - optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); - cleanup(dataStore); + optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); + } }}; } @@ -421,64 +416,63 @@ public class DistributedDataStoreIntegrationTest { CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { - // Create the read-write Tx + // Create the read-write Tx - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - // Do some reads on the Tx on a separate thread. + // Do some reads on the Tx on a separate thread. - final AtomicReference> txExistsFuture = - new AtomicReference<>(); - final AtomicReference>, ReadFailedException>> - txReadFuture = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReadsDone = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - readWriteTx.write(TestModel.TEST_PATH, + final AtomicReference> txExistsFuture = + new AtomicReference<>(); + final AtomicReference>, ReadFailedException>> + txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadsDone = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + readWriteTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); + txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); - txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); - } catch(Exception e) { - caughtEx.set(e); - return; - } finally { - txReadsDone.countDown(); + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReadsDone.countDown(); + } } - } - }; + }; - txThread.start(); + txThread.start(); - // Wait for the Tx operations to complete. + // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); - } - - assertEquals("Tx reads done", true, done); + boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } - // At this point the Tx operations should be waiting for the shard to initialize so - // trigger the latch to let the shard recovery to continue. + assertEquals("Tx reads done", true, done); - blockRecoveryLatch.countDown(); + // At this point the Tx operations should be waiting for the shard to initialize so + // trigger the latch to let the shard recovery to continue. - // Wait for the reads to complete and verify. + blockRecoveryLatch.countDown(); - assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS)); - assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent()); + // Wait for the reads to complete and verify. - readWriteTx.close(); + assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS)); + assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent()); - cleanup(dataStore); + readWriteTx.close(); + } }}; } @@ -500,56 +494,56 @@ public class DistributedDataStoreIntegrationTest { InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { - // Create the write Tx + // Create the write Tx - final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newReadWriteTransaction returned null", writeTx); + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); - // Do some modifications and ready the Tx on a separate thread. + // Do some modifications and ready the Tx on a separate thread. - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - writeTx.write(TestModel.TEST_PATH, + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - txCohort.set(writeTx.ready()); - } catch(Exception e) { - caughtEx.set(e); - return; - } finally { - txReady.countDown(); + txCohort.set(writeTx.ready()); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } } - } - }; + }; - txThread.start(); + txThread.start(); - // Wait for the Tx operations to complete. + // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); - } + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } - assertEquals("Tx ready", true, done); + assertEquals("Tx ready", true, done); - // Wait for the commit to complete. Since the shard never initialized, the Tx should - // have timed out and throw an appropriate exception cause. + // Wait for the commit to complete. Since the shard never initialized, the Tx should + // have timed out and throw an appropriate exception cause. - try { - txCohort.get().canCommit().get(5, TimeUnit.SECONDS); - } catch(ExecutionException e) { - throw e.getCause(); - } finally { - blockRecoveryLatch.countDown(); - cleanup(dataStore); + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + } catch(ExecutionException e) { + throw e.getCause(); + } finally { + blockRecoveryLatch.countDown(); + } } }}; } @@ -572,59 +566,59 @@ public class DistributedDataStoreIntegrationTest { InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { - // Create the read-write Tx + // Create the read-write Tx - final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", readWriteTx); + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); - // Do a read on the Tx on a separate thread. + // Do a read on the Tx on a separate thread. - final AtomicReference>, ReadFailedException>> - txReadFuture = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReadDone = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - readWriteTx.write(TestModel.TEST_PATH, + final AtomicReference>, ReadFailedException>> + txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadDone = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + readWriteTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); - readWriteTx.close(); - } catch(Exception e) { - caughtEx.set(e); - return; - } finally { - txReadDone.countDown(); + readWriteTx.close(); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReadDone.countDown(); + } } - } - }; + }; - txThread.start(); + txThread.start(); - // Wait for the Tx operations to complete. + // Wait for the Tx operations to complete. - boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); - } + boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } - assertEquals("Tx read done", true, done); + assertEquals("Tx read done", true, done); - // Wait for the read to complete. Since the shard never initialized, the Tx should - // have timed out and throw an appropriate exception cause. + // Wait for the read to complete. Since the shard never initialized, the Tx should + // have timed out and throw an appropriate exception cause. - try { - txReadFuture.get().checkedGet(5, TimeUnit.SECONDS); - } catch(ReadFailedException e) { - throw e.getCause(); - } finally { - blockRecoveryLatch.countDown(); - cleanup(dataStore); + try { + txReadFuture.get().checkedGet(5, TimeUnit.SECONDS); + } catch(ReadFailedException e) { + throw e.getCause(); + } finally { + blockRecoveryLatch.countDown(); + } } }}; } @@ -641,60 +635,60 @@ public class DistributedDataStoreIntegrationTest { datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1). shardInitializationTimeout(200, TimeUnit.MILLISECONDS); - DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) { - Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(), - new FindLocalShard(shardName, true)); - assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); + Object result = dataStore.getActorContext().executeOperation( + dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true)); + assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); - // Create the write Tx. + // Create the write Tx. - final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : - dataStore.newReadWriteTransaction(); - assertNotNull("newReadWriteTransaction returned null", writeTx); + try (final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : + dataStore.newReadWriteTransaction()) { + assertNotNull("newReadWriteTransaction returned null", writeTx); - // Do some modifications and ready the Tx on a separate thread. + // Do some modifications and ready the Tx on a separate thread. - final AtomicReference txCohort = new AtomicReference<>(); - final AtomicReference caughtEx = new AtomicReference<>(); - final CountDownLatch txReady = new CountDownLatch(1); - Thread txThread = new Thread() { - @Override - public void run() { - try { - writeTx.write(TestModel.JUNK_PATH, - ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); - - txCohort.set(writeTx.ready()); - } catch(Exception e) { - caughtEx.set(e); - return; - } finally { - txReady.countDown(); - } - } - }; + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.JUNK_PATH, + ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); - txThread.start(); + txCohort.set(writeTx.ready()); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; - // Wait for the Tx operations to complete. + txThread.start(); - boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); - if(caughtEx.get() != null) { - throw caughtEx.get(); - } + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } - assertEquals("Tx ready", true, done); + assertEquals("Tx ready", true, done); - // Wait for the commit to complete. Since no shard leader was elected in time, the Tx - // should have timed out and throw an appropriate exception cause. + // Wait for the commit to complete. Since no shard leader was elected in time, the Tx + // should have timed out and throw an appropriate exception cause. - try { - txCohort.get().canCommit().get(5, TimeUnit.SECONDS); - } catch(ExecutionException e) { - throw e.getCause(); - } finally { - cleanup(dataStore); + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + } catch(ExecutionException e) { + throw e.getCause(); + } + } } }}; } @@ -713,455 +707,453 @@ public class DistributedDataStoreIntegrationTest { @Test public void testTransactionAbort() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("transactionAbortIntegrationTest", "test-1"); + try (DistributedDataStore dataStore = + setupDistributedDataStore("transactionAbortIntegrationTest", "test-1")) { - DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - cohort.canCommit().get(5, TimeUnit.SECONDS); + cohort.canCommit().get(5, TimeUnit.SECONDS); - cohort.abort().get(5, TimeUnit.SECONDS); + cohort.abort().get(5, TimeUnit.SECONDS); - testWriteTransaction(dataStore, TestModel.TEST_PATH, + testWriteTransaction(dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - - cleanup(dataStore); + } }}; } @Test public void testTransactionChainWithSingleShard() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1"); + try (DistributedDataStore dataStore = + setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1")) { - // 1. Create a Tx chain and write-only Tx + // 1. Create a Tx chain and write-only Tx - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - // 2. Write some data + // 2. Write some data - NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - writeTx.write(TestModel.TEST_PATH, testNode); + NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + writeTx.write(TestModel.TEST_PATH, testNode); - // 3. Ready the Tx for commit + // 3. Ready the Tx for commit - final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - // 4. Commit the Tx on another thread that first waits for the second read Tx. + // 4. Commit the Tx on another thread that first waits for the second read Tx. - final CountDownLatch continueCommit1 = new CountDownLatch(1); - final CountDownLatch commit1Done = new CountDownLatch(1); - final AtomicReference commit1Error = new AtomicReference<>(); - new Thread() { - @Override - public void run() { - try { - continueCommit1.await(); - doCommit(cohort1); - } catch (Exception e) { - commit1Error.set(e); - } finally { - commit1Done.countDown(); + final CountDownLatch continueCommit1 = new CountDownLatch(1); + final CountDownLatch commit1Done = new CountDownLatch(1); + final AtomicReference commit1Error = new AtomicReference<>(); + new Thread() { + @Override + public void run() { + try { + continueCommit1.await(); + doCommit(cohort1); + } catch (Exception e) { + commit1Error.set(e); + } finally { + commit1Done.countDown(); + } } - } - }.start(); + }.start(); - // 5. Create a new read Tx from the chain to read and verify the data from the first - // Tx is visible after being readied. + // 5. Create a new read Tx from the chain to read and verify the data from the first + // Tx is visible after being readied. - DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", testNode, optional.get()); + DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); + Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", testNode, optional.get()); - // 6. Create a new RW Tx from the chain, write more data, and ready it + // 6. Create a new RW Tx from the chain, write more data, and ready it - DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); - rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); + DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + MapNode outerNode = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(); + rwTx.write(TestModel.OUTER_LIST_PATH, outerNode); - DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); + DOMStoreThreePhaseCommitCohort cohort2 = rwTx.ready(); - // 7. Create a new read Tx from the chain to read the data from the last RW Tx to - // verify it is visible. + // 7. Create a new read Tx from the chain to read the data from the last RW Tx to + // verify it is visible. - readTx = txChain.newReadWriteTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); + readTx = txChain.newReadWriteTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); - // 8. Wait for the 2 commits to complete and close the chain. + // 8. Wait for the 2 commits to complete and close the chain. - continueCommit1.countDown(); - Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); + continueCommit1.countDown(); + Uninterruptibles.awaitUninterruptibly(commit1Done, 5, TimeUnit.SECONDS); - if(commit1Error.get() != null) { - throw commit1Error.get(); - } - - doCommit(cohort2); + if(commit1Error.get() != null) { + throw commit1Error.get(); + } - txChain.close(); + doCommit(cohort2); - // 9. Create a new read Tx from the data store and verify committed data. + txChain.close(); - readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); + // 9. Create a new read Tx from the data store and verify committed data. - cleanup(dataStore); + readTx = dataStore.newReadOnlyTransaction(); + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", outerNode, optional.get()); + } }}; } @Test public void testTransactionChainWithMultipleShards() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards", - "cars-1", "people-1"); - - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + try (DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards", + "cars-1", "people-1")) { - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode()); - DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); - readWriteTx.write(carPath, car); + DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction(); - MapEntryNode person = PeopleModel.newPersonEntry("jack"); - YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); - readWriteTx.merge(personPath, person); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier carPath = CarsModel.newCarPath("optima"); + readWriteTx.write(carPath, car); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", car, optional.get()); + MapEntryNode person = PeopleModel.newPersonEntry("jack"); + YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); + readWriteTx.merge(personPath, person); - optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); + Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", car, optional.get()); - DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); + optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); - writeTx = txChain.newWriteOnlyTransaction(); + DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); - writeTx.delete(carPath); + writeTx = txChain.newWriteOnlyTransaction(); - DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); + writeTx.delete(carPath); - ListenableFuture canCommit1 = cohort1.canCommit(); - ListenableFuture canCommit2 = cohort2.canCommit(); + DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready(); - doCommit(canCommit1, cohort1); - doCommit(canCommit2, cohort2); - doCommit(cohort3); + ListenableFuture canCommit1 = cohort1.canCommit(); + ListenableFuture canCommit2 = cohort2.canCommit(); - txChain.close(); + doCommit(canCommit1, cohort1); + doCommit(canCommit2, cohort2); + doCommit(cohort3); - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + txChain.close(); - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", person, optional.get()); + optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); - cleanup(dataStore); + optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", person, optional.get()); + } }}; } @Test public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionsInQuickSuccession", "cars-1"); + try (DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { - ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder().put( - LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); - List> futures = new ArrayList<>(); + List> futures = new ArrayList<>(); - DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - futures.add(writeTx.submit()); + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + futures.add(writeTx.submit()); - int nCars = 100; - for(int i = 0; i < nCars; i++) { - DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + int nCars = 100; + for(int i = 0; i < nCars; i++) { + DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); - futures.add(rwTx.submit()); - } + futures.add(rwTx.submit()); + } - for(CheckedFuture f: futures) { - f.checkedGet(); - } + for(CheckedFuture f: futures) { + f.checkedGet(); + } - Optional> optional = txChain.newReadOnlyTransaction().read( + Optional> optional = txChain.newReadOnlyTransaction().read( LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); - - txChain.close(); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); - broker.close(); + txChain.close(); - cleanup(dataStore); + broker.close(); + } }}; } @Test public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionAfterEmptyTxReadied", "test-1"); + try (DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); + DOMStoreReadWriteTransaction rwTx1 = txChain.newReadWriteTransaction(); - rwTx1.ready(); + rwTx1.ready(); - DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); + DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); - Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", false, optional.isPresent()); + Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); - txChain.close(); - - cleanup(dataStore); + txChain.close(); + } }}; } @Test public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable { new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionWhenPreviousNotReady", "test-1"); + try (DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - assertNotNull("newWriteOnlyTransaction returned null", writeTx); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + assertNotNull("newWriteOnlyTransaction returned null", writeTx); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't - // readied. + // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't + // readied. - assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); + assertExceptionOnTxChainCreates(txChain, IllegalStateException.class); + } }}; } @Test public void testCreateChainedTransactionAfterClose() throws Throwable { new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionAfterClose", "test-1"); + try (DistributedDataStore dataStore = setupDistributedDataStore( + "testCreateChainedTransactionAfterClose", "test-1")) { - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - txChain.close(); + txChain.close(); - // Try to create another Tx of each type - should fail b/c the previous Tx was closed. + // Try to create another Tx of each type - should fail b/c the previous Tx was closed. - assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); + assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class); + } }}; } @Test public void testChainWithReadOnlyTxAfterPreviousReady() throws Throwable { new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testChainWithReadOnlyTxAfterPreviousReady", "test-1"); + try (DistributedDataStore dataStore = setupDistributedDataStore( + "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); - // Create a write tx and submit. + // Create a write tx and submit. - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - // Create read-only tx's and issue a read. + // Create read-only tx's and issue a read. - CheckedFuture>, ReadFailedException> readFuture1 = - txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH); + CheckedFuture>, ReadFailedException> readFuture1 = + txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH); - CheckedFuture>, ReadFailedException> readFuture2 = - txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH); + CheckedFuture>, ReadFailedException> readFuture2 = + txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH); - // Create another write tx and issue the write. + // Create another write tx and issue the write. - DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); - writeTx2.write(TestModel.OUTER_LIST_PATH, + DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); + writeTx2.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - // Ensure the reads succeed. + // Ensure the reads succeed. - assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent()); - assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent()); + assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent()); + assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent()); - // Ensure the writes succeed. + // Ensure the writes succeed. - DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); + DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); - doCommit(cohort1); - doCommit(cohort2); + doCommit(cohort1); + doCommit(cohort2); - assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH). + assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH). checkedGet(5, TimeUnit.SECONDS).isPresent()); + } }}; } @Test public void testChainedTransactionFailureWithSingleShard() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testChainedTransactionFailureWithSingleShard", "cars-1"); + try (DistributedDataStore dataStore = setupDistributedDataStore( + "testChainedTransactionFailureWithSingleShard", "cars-1")) { - ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder().put( - LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); - DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - rwTx.submit().checkedGet(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (TransactionCommitFailedException e) { - // Expected - } + try { + rwTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class)); + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class)); - txChain.close(); - broker.close(); - cleanup(dataStore); + txChain.close(); + broker.close(); + } }}; } @Test public void testChainedTransactionFailureWithMultipleShards() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = setupDistributedDataStore( - "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1"); + try (DistributedDataStore dataStore = setupDistributedDataStore( + "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { - ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder().put( - LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); - DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); - writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); - // Note that merge will validate the data and fail but put succeeds b/c deep validation is not - // done for put for performance reasons. - writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + // Note that merge will validate the data and fail but put succeeds b/c deep validation is not + // done for put for performance reasons. + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.submit().checkedGet(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (TransactionCommitFailedException e) { - // Expected - } + try { + writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); - txChain.close(); - broker.close(); - cleanup(dataStore); + txChain.close(); + broker.close(); + } }}; } @Test public void testChangeListenerRegistration() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("testChangeListenerRegistration", "test-1"); + try (DistributedDataStore dataStore = + setupDistributedDataStore("testChangeListenerRegistration", "test-1")) { - testWriteTransaction(dataStore, TestModel.TEST_PATH, + testWriteTransaction(dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - MockDataChangeListener listener = new MockDataChangeListener(1); + MockDataChangeListener listener = new MockDataChangeListener(1); - ListenerRegistration - listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, - DataChangeScope.SUBTREE); + ListenerRegistration + listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, + DataChangeScope.SUBTREE); - assertNotNull("registerChangeListener returned null", listenerReg); + assertNotNull("registerChangeListener returned null", listenerReg); - // Wait for the initial notification + // Wait for the initial notification - listener.waitForChangeEvents(TestModel.TEST_PATH); + listener.waitForChangeEvents(TestModel.TEST_PATH); - listener.reset(2); + listener.reset(2); - // Write 2 updates. + // Write 2 updates. - testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); - YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). - nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); - testWriteTransaction(dataStore, listPath, + YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). + nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + testWriteTransaction(dataStore, listPath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - // Wait for the 2 updates. + // Wait for the 2 updates. - listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); - listenerReg.close(); + listenerReg.close(); - testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). + testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH). nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); - listener.expectNoMoreChanges("Received unexpected change after close"); - - cleanup(dataStore); + listener.expectNoMoreChanges("Received unexpected change after close"); + } }}; } @@ -1196,20 +1188,19 @@ public class DistributedDataStoreIntegrationTest { new DatastoreSnapshot.ShardSnapshot("people", org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot)))); - DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", - true, "cars", "people"); + try (DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", + true, "cars", "people")) { - DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", carsNode, optional.get()); + Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", carsNode, optional.get()); - optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertEquals("isPresent", true, optional.isPresent()); - assertEquals("Data node", peopleNode, optional.get()); - - cleanup(dataStore); + optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", peopleNode, optional.get()); + } }}; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 132ac47e7d..0c7575a61d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -240,10 +240,10 @@ public class DistributedDataStoreRemotingIntegrationTest { ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2")); - DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder). - setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE); - - verifyCars(member2Datastore.newReadOnlyTransaction(), car2); + try (DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder). + setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE)) { + verifyCars(member2Datastore.newReadOnlyTransaction(), car2); + } JavaTestKit.shutdownActorSystem(newSystem); } @@ -526,21 +526,24 @@ public class DistributedDataStoreRemotingIntegrationTest { DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder(). shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder); - newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS); - followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS); + try (DistributedDataStore ds = + newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { - // Write a car entry to the new leader - should switch to local Tx + followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS); - writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + // Write a car entry to the new leader - should switch to local Tx - MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); - writeTx.merge(car1Path, car1); + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - followerTestKit.doCommit(writeTx.ready()); + MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima"); + writeTx.merge(car1Path, car1); + + followerTestKit.doCommit(writeTx.ready()); - verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1); + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1); + } } @SuppressWarnings("unchecked") @@ -795,61 +798,62 @@ public class DistributedDataStoreRemotingIntegrationTest { initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE); IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder); - DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName, - MODULE_SHARDS_CARS_PEOPLE_1_2_3, false); - - // Create and submit a couple tx's so they're pending. - - DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); - writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); - - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { - @Override - public void verify(ShardStats stats) { - assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()); - } - }); - - writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); - MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); - writeTx.write(CarsModel.newCarPath("optima"), car); - DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); - - IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { - @Override - public void verify(ShardStats stats) { - assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()); - } - }); - - // Gracefully stop the leader via a Shutdown message. - - sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. + try (DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName, + MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { + + // Create and submit a couple tx's so they're pending. + + DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()); + } + }); + + writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); + MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)); + writeTx.write(CarsModel.newCarPath("optima"), car); + DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready(); + + IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() { + @Override + public void verify(ShardStats stats) { + assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()); + } + }); + + // Gracefully stop the leader via a Shutdown message. + + sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder. shardElectionTimeoutFactor(100)); - FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); - Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); - ActorRef leaderActor = Await.result(future, duration); + FiniteDuration duration = FiniteDuration.create(5, TimeUnit.SECONDS); + Future future = leaderDistributedDataStore.getActorContext().findLocalShardAsync("cars"); + ActorRef leaderActor = Await.result(future, duration); - Future stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE); + Future stopFuture = Patterns.gracefulStop(leaderActor, duration, Shutdown.INSTANCE); - // Commit the 2 transactions. They should finish and succeed. + // Commit the 2 transactions. They should finish and succeed. - followerTestKit.doCommit(cohort1); - followerTestKit.doCommit(cohort2); + followerTestKit.doCommit(cohort1); + followerTestKit.doCommit(cohort2); - // Wait for the leader actor stopped. + // Wait for the leader actor stopped. - Boolean stopped = Await.result(stopFuture, duration); - assertEquals("Stopped", Boolean.TRUE, stopped); + Boolean stopped = Await.result(stopFuture, duration); + assertEquals("Stopped", Boolean.TRUE, stopped); - // Verify leadership was transferred by reading the committed data from the other nodes. + // Verify leadership was transferred by reading the committed data from the other nodes. - verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car); - verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car); + verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car); + verifyCars(follower2DistributedDataStore.newReadOnlyTransaction(), car); + } } @Test @@ -960,28 +964,31 @@ public class DistributedDataStoreRemotingIntegrationTest { DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder(). shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder); - follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS); - followerTestKit.waitForMembersUp("member-1", "member-3"); - follower2TestKit.waitForMembersUp("member-1", "member-2"); + try (DistributedDataStore ds = + follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) { - // Do an initial read to get the primary shard info cached. + followerTestKit.waitForMembersUp("member-1", "member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); - DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); - readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + // Do an initial read to get the primary shard info cached. - // Shutdown the leader and try to create a new tx. + DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); + readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + // Shutdown the leader and try to create a new tx. - sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. + JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + + sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder. operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); - DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); - rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); - followerTestKit.doCommit(rwTx.ready()); + followerTestKit.doCommit(rwTx.ready()); + } } private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index feb8f7eaef..afffa99e0c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -12,7 +12,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.PoisonPill; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.Member; @@ -216,12 +215,6 @@ public class IntegrationTestKit extends ShardTestKit { cohort.commit().get(5, TimeUnit.SECONDS); } - public void cleanup(DistributedDataStore dataStore) { - if(dataStore != null) { - dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); - } - } - void assertExceptionOnCall(Callable callable, Class expType) throws Exception { try { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java index d018ce4a91..de751beeb9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java @@ -112,8 +112,13 @@ public class MemberNode { public void cleanup() { if(!cleanedUp) { cleanedUp = true; - kit.cleanup(configDataStore); - kit.cleanup(operDataStore); + if (configDataStore != null) { + configDataStore.close(); + } + if (operDataStore != null) { + operDataStore.close(); + } + IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index c052134772..9343f381e5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -28,7 +28,6 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityTypeEntryWithEntityEntry; import akka.actor.ActorRef; -import akka.actor.PoisonPill; import com.google.common.base.Optional; import com.google.common.collect.Sets; import java.util.Collection; @@ -104,7 +103,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh @After public void tearDown() { - dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), ActorRef.noSender()); + dataStore.close(); } private static T verifyMessage(final DistributedEntityOwnershipService mock, final Class type) { -- 2.36.6