Partially enable testSingleTransactionsWritesInQuickSuccession() 80/98380/6
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 9 Nov 2021 20:02:31 +0000 (21:02 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 10 Nov 2021 11:30:34 +0000 (12:30 +0100)
Split the asserts into two methods, asserting the two datastore
options. The ask-based path remains disabled because it currently
fails tests.

Also improve tell-based protocol assertions, which need to wait
for the purge process to settle.

Change-Id: I8b2f3d84b2c7cd01dec4f7994eda716d022a98aa
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java

index cedd4ace0d165abb9771180c56c202cbad9fa65a..11f0e3d02a33834528f5947cee4a058603bacfd3 100644 (file)
@@ -8,17 +8,19 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
@@ -30,14 +32,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runners.Parameterized.Parameter;
 import org.mockito.Mockito;
@@ -48,7 +48,7 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderExc
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
@@ -253,7 +253,6 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     }
 
     @Test
-    @Ignore("Flushes a closed tx leak in single node, needs to be handled separately")
     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
@@ -269,41 +268,56 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             int numCars = 5;
             for (int i = 0; i < numCars; i++) {
                 writeTx = txChain.newWriteOnlyTransaction();
-                writeTx.write(CarsModel.newCarPath("car" + i),
-                    CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
+                writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, Uint64.valueOf(20000)));
 
                 testKit.doCommit(writeTx.ready());
 
-                DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
-                domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
-
-                domStoreReadTransaction.close();
-            }
-
-            // verify frontend metadata has no holes in purged transactions causing overtime memory leak
-            Optional<ActorRef> localShard = dataStore.getActorUtils().findLocalShard("cars-1");
-            FrontendShardDataTreeSnapshotMetadata frontendMetadata =
-                    (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
-                            .executeOperation(localShard.get(), new RequestFrontendMetadata());
-
-            if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
-                Iterator<FrontendHistoryMetadata> iterator =
-                        frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
-                FrontendHistoryMetadata metadata = iterator.next();
-                while (iterator.hasNext() && metadata.getHistoryId() != 1) {
-                    metadata = iterator.next();
+                try (var tx = txChain.newReadOnlyTransaction()) {
+                    tx.read(CarsModel.BASE_PATH).get();
                 }
-                assertEquals(1, metadata.getPurgedTransactions().size());
-            } else {
-                // ask based should track no metadata
-                assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
             }
 
-            final Optional<NormalizedNode> optional = txChain.newReadOnlyTransaction()
-                    .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
-            assertTrue("isPresent", optional.isPresent());
-            assertEquals("# cars", numCars, ((Collection<?>) optional.get().body()).size());
+            // wait to let the shard catch up with purged
+            await("transaction state propagation").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    // verify frontend metadata has no holes in purged transactions causing overtime memory leak
+                    final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                        (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+                            .executeOperation(localShard, new RequestFrontendMetadata());
+
+                    final var clientMeta = frontendMetadata.getClients().get(0);
+                    if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        assertTellMetadata(clientMeta);
+                    } else {
+                        assertAskMetadata(clientMeta);
+                    }
+                });
+
+            final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
+                .get(5, TimeUnit.SECONDS)
+                .orElseThrow()
+                .body();
+            assertThat(body, instanceOf(Collection.class));
+            assertEquals("# cars", numCars, ((Collection<?>) body).size());
+        }
+    }
+
+    private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
+        // FIXME: needs to be enabled
+        assumeFalse(true);
+        // ask based should track no metadata
+        assertEquals(List.of(), clientMeta.getCurrentHistories());
+    }
+
+    private static void assertTellMetadata(final FrontendClientMetadata clientMeta) {
+        final var iterator = clientMeta.getCurrentHistories().iterator();
+        var metadata = iterator.next();
+        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+            metadata = iterator.next();
         }
+        assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")