X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FAbstractDistributedDataStoreIntegrationTest.java;h=11f0e3d02a33834528f5947cee4a058603bacfd3;hp=cedd4ace0d165abb9771180c56c202cbad9fa65a;hb=9503012683559849cd2f01b882b57fdcbcca59f0;hpb=4e097d8d56a7e3814c63a69da183765fd4c78a56 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java index cedd4ace0d..11f0e3d02a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java @@ -8,17 +8,19 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -30,14 +32,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Ignore; import org.junit.Test; import org.junit.runners.Parameterized.Parameter; import org.mockito.Mockito; @@ -48,7 +48,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata; import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; @@ -253,7 +253,6 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { } @Test - @Ignore("Flushes a closed tx leak in single node, needs to be handled separately") public void testSingleTransactionsWritesInQuickSuccession() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( @@ -269,41 +268,56 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { int numCars = 5; for (int i = 0; i < numCars; i++) { writeTx = txChain.newWriteOnlyTransaction(); - writeTx.write(CarsModel.newCarPath("car" + i), - CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); + writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000))); testKit.doCommit(writeTx.ready()); - DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); - domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); - - domStoreReadTransaction.close(); - } - - // verify frontend metadata has no holes in purged transactions causing overtime memory leak - Optional localShard = dataStore.getActorUtils().findLocalShard("cars-1"); - FrontendShardDataTreeSnapshotMetadata frontendMetadata = - (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils() - .executeOperation(localShard.get(), new RequestFrontendMetadata()); - - if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { - Iterator iterator = - frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); - FrontendHistoryMetadata metadata = iterator.next(); - while (iterator.hasNext() && metadata.getHistoryId() != 1) { - metadata = iterator.next(); + try (var tx = txChain.newReadOnlyTransaction()) { + tx.read(CarsModel.BASE_PATH).get(); } - assertEquals(1, metadata.getPurgedTransactions().size()); - } else { - // ask based should track no metadata - assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); } - final Optional optional = txChain.newReadOnlyTransaction() - .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); - assertTrue("isPresent", optional.isPresent()); - assertEquals("# cars", numCars, ((Collection) optional.get().body()).size()); + // wait to let the shard catch up with purged + await("transaction state propagation").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + // verify frontend metadata has no holes in purged transactions causing overtime memory leak + final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow(); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils() + .executeOperation(localShard, new RequestFrontendMetadata()); + + final var clientMeta = frontendMetadata.getClients().get(0); + if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + assertTellMetadata(clientMeta); + } else { + assertAskMetadata(clientMeta); + } + }); + + final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH) + .get(5, TimeUnit.SECONDS) + .orElseThrow() + .body(); + assertThat(body, instanceOf(Collection.class)); + assertEquals("# cars", numCars, ((Collection) body).size()); + } + } + + private static void assertAskMetadata(final FrontendClientMetadata clientMeta) { + // FIXME: needs to be enabled + assumeFalse(true); + // ask based should track no metadata + assertEquals(List.of(), clientMeta.getCurrentHistories()); + } + + private static void assertTellMetadata(final FrontendClientMetadata clientMeta) { + final var iterator = clientMeta.getCurrentHistories().iterator(); + var metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); } + assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString()); } @SuppressWarnings("checkstyle:IllegalCatch")