X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractDistributedDataStoreIntegrationTest.java;h=fa83bd010e936b6ed03e6a98411214578bc5bda9;hb=118cd0216b0c6b0ec1a01689ec2025a13e090861;hp=cbe494f06c91f5a20fcf155bf8fc13cc7cc0ddc4;hpb=b21572e7fcdc926c7839f2b936eb3f623304fd6c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java index cbe494f06c..fa83bd010e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -18,12 +20,9 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; -import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -32,15 +31,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Ignore; import org.junit.Test; import org.junit.runners.Parameterized.Parameter; import org.mockito.Mockito; @@ -51,7 +47,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata; import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; @@ -81,11 +77,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.tree.api.DataTree; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration; +import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; public abstract class AbstractDistributedDataStoreIntegrationTest { @@ -154,7 +150,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // Verify the data in the store final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); - Optional> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); + Optional optional = readTx.read(carPath).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", car, optional.get()); @@ -176,14 +172,14 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // 2. Write some data final YangInstanceIdentifier nodePath = TestModel.TEST_PATH; - final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); readWriteTx.write(nodePath, nodeToWrite); // 3. Read the data from Tx final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS); assertEquals("exists", Boolean.TRUE, exists); - Optional> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); + Optional optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", nodeToWrite, optional.get()); @@ -236,7 +232,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS); assertEquals("exists", Boolean.TRUE, exists); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + Optional optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", car, optional.get()); @@ -256,7 +252,6 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { } @Test - @Ignore("Flushes a closed tx leak in single node, needs to be handled separately") public void testSingleTransactionsWritesInQuickSuccession() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( @@ -272,43 +267,54 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { int numCars = 5; for (int i = 0; i < numCars; i++) { writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); + writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); testKit.doCommit(writeTx.ready()); - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); - - domStoreReadTransaction.close(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); + } } - // verify frontend metadata has no holes in purged transactions causing overtime memory leak - Optional localShard = dataStore.getActorUtils().findLocalShard("cars-1"); - FrontendShardDataTreeSnapshotMetadata frontendMetadata = - (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils() - .executeOperation(localShard.get(), new RequestFrontendMetadata()); - - if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - Iterator iterator = - frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); - FrontendHistoryMetadata metadata = iterator.next(); - while (iterator.hasNext() && metadata.getHistoryId() != 1) { - metadata = iterator.next(); - } - Set> ranges = metadata.getPurgedTransactions().asRanges(); + // wait to let the shard catch up with purged + await("transaction state propagation").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + // verify frontend metadata has no holes in purged transactions causing overtime memory leak + final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow(); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils() + .executeOperation(localShard, new RequestFrontendMetadata()); + + final var clientMeta = frontendMetadata.getClients().get(0); + if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + assertTellMetadata(clientMeta); + } else { + assertAskMetadata(clientMeta); + } + }); - assertEquals(1, ranges.size()); - } else { - // ask based should track no metadata - assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); - } + final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH) + .get(5, TimeUnit.SECONDS) + .orElseThrow() + .body(); + assertThat(body, instanceOf(Collection.class)); + assertEquals("# cars", numCars, ((Collection) body).size()); + } + } - final Optional> optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + private static void assertAskMetadata(final FrontendClientMetadata clientMeta) { + // ask based should track no metadata + assertEquals(List.of(), clientMeta.getCurrentHistories()); + } + + private static void assertTellMetadata(final FrontendClientMetadata clientMeta) { + final var iterator = clientMeta.getCurrentHistories().iterator(); + var metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); } + assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString()); } @SuppressWarnings("checkstyle:IllegalCatch") @@ -445,7 +451,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { assertNotNull("newWriteOnlyTransaction returned null", writeTx); // 2. Write some data - final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); writeTx.write(TestModel.TEST_PATH, testNode); // 3. Ready the Tx for commit @@ -471,7 +477,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { // the data from the first // Tx is visible after being readied. DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction(); - Optional> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + Optional optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", testNode, optional.get()); @@ -544,7 +550,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack"); readWriteTx.merge(personPath, person); - Optional> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); + Optional optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", car, optional.get()); @@ -615,10 +621,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { f.get(5, TimeUnit.SECONDS); } - final Optional> optional = txChain.newReadOnlyTransaction() + final Optional optional = txChain.newReadOnlyTransaction() .read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + assertEquals("# cars", numCars, ((Collection) optional.get().body()).size()); txChain.close(); @@ -640,7 +646,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final DOMStoreReadWriteTransaction rwTx2 = txChain.newReadWriteTransaction(); - final Optional> optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + final Optional optional = rwTx2.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); assertFalse("isPresent", optional.isPresent()); txChain.close(); @@ -696,10 +702,10 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); // Create read-only tx's and issue a read. - FluentFuture>> readFuture1 = txChain + FluentFuture> readFuture1 = txChain .newReadOnlyTransaction().read(TestModel.TEST_PATH); - FluentFuture>> readFuture2 = txChain + FluentFuture> readFuture2 = txChain .newReadOnlyTransaction().read(TestModel.TEST_PATH); // Create another write tx and issue the write. @@ -872,7 +878,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { DataTree dataTree = new InMemoryDataTreeFactory().create( DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); - NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty()); + NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty()); final Snapshot carsSnapshot = Snapshot.create( new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)), @@ -881,7 +887,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); - final NormalizedNode peopleNode = PeopleModel.create(); + final NormalizedNode peopleNode = PeopleModel.create(); AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty()); @@ -900,7 +906,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); // two reads - Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); + Optional optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); assertTrue("isPresent", optional.isPresent()); assertEquals("Data node", carsNode, optional.get()); @@ -925,7 +931,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { ContainerNode rootNode = ImmutableContainerNodeBuilder.create() .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) - .withChild((ContainerNode) CarsModel.create()) + .withChild(CarsModel.create()) .build(); testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);