Add unit test for FrontedMetadata memory leaks
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreRemotingIntegrationTest.java
index 0fb3a41c7d8fec2f002da4c9e22774ccac7b84fe..6fead6bbad2d0166d1183ba72146b45ee318139c 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -30,6 +31,8 @@ import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -38,15 +41,18 @@ import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -58,7 +64,9 @@ import org.opendaylight.controller.cluster.access.client.RequestTimeoutException
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -68,6 +76,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -118,7 +128,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
+                { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
         });
     }
 
@@ -328,6 +338,131 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
+        final String testName = "testWriteTransactionWithSingleShard";
+        initDatastoresWithCars(testName);
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.write(CarsModel.newCarPath("car" + i),
+                    CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+            followerTestKit.doCommit(writeTx.ready());
+
+            DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+            domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+            domStoreReadTransaction.close();
+        }
+
+        // wait to let the shard catch up with purged
+        await("Range set leak test").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Optional<ActorRef> localShard =
+                            leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                                    .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        Iterator<FrontendHistoryMetadata> iterator =
+                                frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                        FrontendHistoryMetadata metadata = iterator.next();
+                        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                            metadata = iterator.next();
+                        }
+
+                        assertEquals(0, metadata.getClosedTransactions().size());
+                        assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
+                                metadata.getPurgedTransactions().asRanges().iterator().next());
+                    } else {
+                        // ask based should track no metadata
+                        assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+                    }
+                });
+
+        final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+                .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        assertTrue("isPresent", optional.isPresent());
+        assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+    }
+
+    @Test
+    @Ignore("Flushes out tell based leak needs to be handled separately")
+    public void testCloseTransactionMetadataLeak() throws Exception {
+        // Ask based frontend seems to have some issues with back to back close
+        Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
+
+        final String testName = "testWriteTransactionWithSingleShard";
+        initDatastoresWithCars(testName);
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.close();
+
+            DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+            domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+            domStoreReadTransaction.close();
+        }
+
+        writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        // wait to let the shard catch up with purged
+        await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Optional<ActorRef> localShard =
+                            leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                                    .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        Iterator<FrontendHistoryMetadata> iterator =
+                                frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                        FrontendHistoryMetadata metadata = iterator.next();
+                        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                            metadata = iterator.next();
+                        }
+
+                        Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+                        assertEquals(0, metadata.getClosedTransactions().size());
+                        assertEquals(1, ranges.size());
+                    } else {
+                        // ask based should track no metadata
+                        assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+                    }
+                });
+
+        final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+                .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        assertTrue("isPresent", optional.isPresent());
+        assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+    }
+
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
         initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
@@ -637,7 +772,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
 
-        final com.google.common.base.Optional<ActorRef> carsFollowerShard =
+        final Optional<ActorRef> carsFollowerShard =
                 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
@@ -704,7 +839,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
         followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
 
-        final com.google.common.base.Optional<ActorRef> carsFollowerShard =
+        final Optional<ActorRef> carsFollowerShard =
                 followerDistributedDataStore.getActorUtils().findLocalShard("cars");
         assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
 
@@ -771,8 +906,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         followerDatastoreContextBuilder.shardBatchedModificationCount(2);
         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
@@ -883,8 +1018,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testLeadershipTransferOnShutdown() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
         final String testName = "testLeadershipTransferOnShutdown";
@@ -948,8 +1083,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionWithIsolatedLeader() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
         final String testName = "testTransactionWithIsolatedLeader";
@@ -1028,7 +1163,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             fail("Exception expected");
         } catch (final ExecutionException e) {
             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
-            if (DistributedDataStore.class.equals(testParameter)) {
+            if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
                 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
                         || e.getCause() instanceof ShardLeaderNotRespondingException);
             } else {
@@ -1067,7 +1202,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             fail("Exception expected");
         } catch (final ExecutionException e) {
             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
-            if (DistributedDataStore.class.equals(testParameter)) {
+            if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
                 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
             } else {
                 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
@@ -1157,7 +1292,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Test
     public void testReadWriteMessageSlicing() throws Exception {
         // The slicing is only implemented for tell-based protocol
-        Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+        Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
 
         leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
         followerDatastoreContextBuilder.maximumMessageSliceSize(100);