X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractDistributedDataStoreIntegrationTest.java;h=6563ff6eda2be505bdccb37edf525bfae3211936;hp=cbe494f06c91f5a20fcf155bf8fc13cc7cc0ddc4;hb=HEAD;hpb=b21572e7fcdc926c7839f2b936eb3f623304fd6c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java index cbe494f06c..9f19ca045d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java @@ -8,23 +8,23 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; -import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -32,10 +32,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -43,15 +41,13 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Ignore; import org.junit.Test; import org.junit.runners.Parameterized.Parameter; -import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; +import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; -import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; @@ -67,31 +63,29 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; -import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener; import org.opendaylight.mdsal.dom.spi.store.DOMStore; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.common.Uint64; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration; +import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; public abstract class AbstractDistributedDataStoreIntegrationTest { - @Parameter - public Class testParameter; + public Class testParameter; protected ActorSystem system; @@ -105,8 +99,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testWriteTransactionWithSingleShard() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "transactionIntegrationTest", "test-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "transactionIntegrationTest", "test-1")) { testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); @@ -121,8 +114,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testWriteTransactionWithMultipleShards() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testWriteTransactionWithMultipleShards", + "cars-1", "people-1")) { DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); @@ -154,21 +147,16 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // Verify the data in the store final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); + assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS)); + assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS)); } } @Test public void testReadWriteTransactionWithSingleShard() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithSingleShard", + "test-1")) { // 1. Create a read-write Tx final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); @@ -176,16 +164,14 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // 2. Write some data final YangInstanceIdentifier nodePath = TestModel.TEST_PATH; - final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); readWriteTx.write(nodePath, nodeToWrite); // 3. Read the data from Tx final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS); assertEquals("exists", Boolean.TRUE, exists); - Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); + assertEquals(Optional.of(nodeToWrite), readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS)); // 4. Ready the Tx for commit final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready(); @@ -196,17 +182,15 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // 6. Verify the data in the store final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", nodeToWrite, optional.get()); + assertEquals(Optional.of(nodeToWrite), readTx.read(nodePath).get(5, TimeUnit.SECONDS)); } } @Test public void testReadWriteTransactionWithMultipleShards() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithMultipleShards", + "cars-1", "people-1")) { DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", readWriteTx); @@ -236,35 +220,27 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS); assertEquals("exists", Boolean.TRUE, exists); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); + assertEquals("Data node", Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS)); testKit.doCommit(readWriteTx.ready()); // Verify the data in the store DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); + assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS)); + assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS)); } } @Test - @Ignore("Flushes a closed tx leak in single node, needs to be handled separately") public void testSingleTransactionsWritesInQuickSuccession() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { + final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (var dataStore = testKit.setupDataStore(testParameter, "testSingleTransactionsWritesInQuickSuccession", + "cars-1")) { - final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + final var txChain = dataStore.createTransactionChain(); - DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + var writeTx = txChain.newWriteOnlyTransaction(); writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); testKit.doCommit(writeTx.ready()); @@ -272,42 +248,39 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { int numCars = 5; for (int i = 0; i < numCars; i++) { writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); + writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); testKit.doCommit(writeTx.ready()); - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); - - domStoreReadTransaction.close(); - } - - // verify frontend metadata has no holes in purged transactions causing overtime memory leak - Optional localShard = dataStore.getActorUtils().findLocalShard("cars-1"); - FrontendShardDataTreeSnapshotMetadata frontendMetadata = - (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils() - .executeOperation(localShard.get(), new RequestFrontendMetadata()); - - if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - Iterator iterator = - frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); - FrontendHistoryMetadata metadata = iterator.next(); - while (iterator.hasNext() && metadata.getHistoryId() != 1) { - metadata = iterator.next(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); } - Set> ranges = metadata.getPurgedTransactions().asRanges(); - - assertEquals(1, ranges.size()); - } else { - // ask based should track no metadata - assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); } - final Optional> optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + // wait to let the shard catch up with purged + await("transaction state propagation").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + // verify frontend metadata has no holes in purged transactions causing overtime memory leak + final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow(); + final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils() + .executeOperation(localShard, new RequestFrontendMetadata()); + + final var clientMeta = frontendMetadata.getClients().get(0); + final var iterator = clientMeta.getCurrentHistories().iterator(); + var metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString()); + }); + + final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH) + .get(5, TimeUnit.SECONDS) + .orElseThrow() + .body(); + assertThat(body, instanceOf(Collection.class)); + assertEquals("# cars", numCars, ((Collection) body).size()); } } @@ -327,8 +300,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1) .shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) { - + try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) { final Object result = dataStore.getActorUtils().executeOperation( dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true)); assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); @@ -373,18 +345,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // leader was elected in time, the Tx // should have timed out and throw an appropriate // exception cause. - try { - txCohort.get().canCommit().get(10, TimeUnit.SECONDS); - fail("Expected NoShardLeaderException"); - } catch (final ExecutionException e) { - final String msg = "Unexpected exception: " - + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.isAssignableFrom(testParameter)) { - assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException); - } else { - assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); - } - } + final var ex = assertThrows(ExecutionException.class, + () -> txCohort.get().canCommit().get(10, TimeUnit.SECONDS)); + assertTrue("Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()), + Throwables.getRootCause(ex) instanceof RequestTimeoutException); } finally { try { if (writeTxToClose != null) { @@ -412,8 +376,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testTransactionAbort() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "transactionAbortIntegrationTest", "test-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "transactionAbortIntegrationTest", "test-1")) { final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); assertNotNull("newWriteOnlyTransaction returned null", writeTx); @@ -435,8 +398,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @SuppressWarnings("checkstyle:IllegalCatch") public void testTransactionChainWithSingleShard() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testTransactionChainWithSingleShard", "test-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithSingleShard", "test-1")) { // 1. Create a Tx chain and write-only Tx final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -445,7 +407,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { assertNotNull("newWriteOnlyTransaction returned null", writeTx); // 2. Write some data - final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); writeTx.write(TestModel.TEST_PATH, testNode); // 3. Ready the Tx for commit @@ -471,9 +433,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // the data from the first // Tx is visible after being readied. DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", testNode, optional.get()); + assertEquals(Optional.of(testNode), readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS)); // 6. Create a new RW Tx from the chain, write more data, // and ready it @@ -489,9 +449,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // from the last RW Tx to // verify it is visible. readTx = txChain.newReadWriteTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); + assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS)); // 8. Wait for the 2 commits to complete and close the // chain. @@ -509,17 +467,15 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // 9. Create a new read Tx from the data store and verify // committed data. readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", outerNode, optional.get()); + assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS)); } } @Test public void testTransactionChainWithMultipleShards() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithMultipleShards", + "cars-1", "people-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -544,13 +500,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.merge(personPath, person); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", car, optional.get()); - - optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); + assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS)); + assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS)); final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready(); @@ -571,28 +522,23 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); - assertFalse("isPresent", optional.isPresent()); - - optional = readTx.read(personPath).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", person, optional.get()); + assertEquals(Optional.empty(), readTx.read(carPath).get(5, TimeUnit.SECONDS)); + assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS)); } } @Test public void testCreateChainedTransactionsInQuickSuccession() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionsInQuickSuccession", + "cars-1")) { final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder() .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); - DOMTransactionChain txChain = broker.createTransactionChain(listener); + DOMTransactionChain txChain = broker.createTransactionChain(); final List> futures = new ArrayList<>(); @@ -615,10 +561,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { f.get(5, TimeUnit.SECONDS); } - final Optional> optional = txChain.newReadOnlyTransaction() + final Optional optional = txChain.newReadOnlyTransaction() .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + assertEquals("# cars", numCars, ((Collection) optional.orElseThrow().body()).size()); txChain.close(); @@ -629,8 +575,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", + "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -640,7 +586,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); - final Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + final Optional optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertFalse("isPresent", optional.isPresent()); txChain.close(); @@ -650,8 +596,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionWhenPreviousNotReady", + "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -670,8 +616,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testCreateChainedTransactionAfterClose() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testCreateChainedTransactionAfterClose", "test-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterClose", + "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); txChain.close(); @@ -685,8 +631,8 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testChainWithReadOnlyTxAfterPreviousReady", + "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -696,10 +642,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); // Create read-only tx's and issue a read. - FluentFuture>> readFuture1 = txChain + FluentFuture> readFuture1 = txChain .newReadOnlyTransaction().read(TestModel.TEST_PATH); - FluentFuture>> readFuture2 = txChain + FluentFuture> readFuture2 = txChain .newReadOnlyTransaction().read(TestModel.TEST_PATH); // Create another write tx and issue the write. @@ -727,38 +673,34 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testChainedTransactionFailureWithSingleShard() throws Exception { - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) { + final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); + try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithSingleShard", + "cars-1")) { - final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + final var broker = new ConcurrentDOMDataBroker( ImmutableMap.builder() .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); - final DOMTransactionChain txChain = broker.createTransactionChain(listener); + final var listener = mock(FutureCallback.class); + final var txChain = broker.createTransactionChain(); + txChain.addCallback(listener); - final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction(); + final var writeTx = txChain.newReadWriteTransaction(); writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + final var invalidData = Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")) + .build(); writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)); - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), - any(Throwable.class)); + verify(listener, timeout(5000)).onFailure(any()); txChain.close(); broker.close(); @@ -768,40 +710,36 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testChainedTransactionFailureWithMultipleShards() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithMultipleShards", + "cars-1", "people-1")) { final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( ImmutableMap.builder() .put(LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class); - final DOMTransactionChain txChain = broker.createTransactionChain(listener); + final var listener = mock(FutureCallback.class); + final DOMTransactionChain txChain = broker.createTransactionChain(); + txChain.addCallback(listener); final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction(); writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); - final ContainerNode invalidData = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) - .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + final ContainerNode invalidData = Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME)) + .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")) + .build(); writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); // Note that merge will validate the data and fail but put // succeeds b/c deep validation is not // done for put for performance reasons. - try { - writeTx.commit().get(5, TimeUnit.SECONDS); - fail("Expected TransactionCommitFailedException"); - } catch (final ExecutionException e) { - // Expected - } + assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS)); - verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), - any(Throwable.class)); + verify(listener, timeout(5000)).onFailure(any()); txChain.close(); broker.close(); @@ -811,16 +749,15 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { @Test public void testDataTreeChangeListenerRegistration() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testDataTreeChangeListenerRegistration", "test-1")) { + try (var dataStore = testKit.setupDataStore(testParameter, "testDataTreeChangeListenerRegistration", + "test-1")) { testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); - ListenerRegistration listenerReg = dataStore - .registerTreeChangeListener(TestModel.TEST_PATH, listener); + final var listenerReg = dataStore.registerTreeChangeListener(TestModel.TEST_PATH, listener); assertNotNull("registerTreeChangeListener returned null", listenerReg); @@ -872,7 +809,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { DataTree dataTree = new InMemoryDataTreeFactory().create( DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); - NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty()); + NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of()); final Snapshot carsSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), @@ -881,10 +818,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); - final NormalizedNode peopleNode = PeopleModel.create(); + final NormalizedNode peopleNode = PeopleModel.create(); AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); - root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty()); + root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of()); final Snapshot peopleSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), @@ -894,41 +831,31 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot), new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot))); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, name, "module-shards-member1.conf", true, "cars", "people")) { + try (var dataStore = testKit.setupDataStore(testParameter, name, "module-shards-member1.conf", true, + "cars", "people")) { final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); // two reads - Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", carsNode, optional.get()); - - optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("Data node", peopleNode, optional.get()); + assertEquals(Optional.of(carsNode), readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS)); + assertEquals(Optional.of(peopleNode), readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS)); } } @Test + @Ignore("ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate") + // FIXME: re-enable this test public void testSnapshotOnRootOverwrite() throws Exception { - if (!DistributedDataStore.class.isAssignableFrom(testParameter)) { - // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate - return; - } - - final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), - datastoreContextBuilder.snapshotOnRootOverwrite(true)); - try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( - testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf", - true, "cars", "default")) { + final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder.snapshotOnRootOverwrite(true)); + try (var dataStore = testKit.setupDataStore(testParameter, "testRootOverwrite", + "module-shards-default-cars-member1.conf", true, "cars", "default")) { - ContainerNode rootNode = ImmutableContainerNodeBuilder.create() - .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) - .withChild((ContainerNode) CarsModel.create()) - .build(); + final var rootNode = Builders.containerBuilder() + .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME)) + .withChild(CarsModel.create()) + .build(); - testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode); + testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode); IntegrationTestKit.verifyShardState(dataStore, "cars", state -> assertEquals(1, state.getSnapshotIndex())); @@ -948,7 +875,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1); // root overwrite so expect a snapshot - testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode); + testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode); // this was a real snapshot so everything should be in it(1 + 10 + 1) IntegrationTestKit.verifyShardState(dataStore, "cars",