Add unit test for FrontedMetadata memory leaks 71/81971/30
authorTomas Cere <tomas.cere@pantheon.tech>
Thu, 9 May 2019 09:12:34 +0000 (11:12 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 29 Jul 2019 09:53:25 +0000 (11:53 +0200)
This memory leak occurs when write and read-only transactions
intertwine, leading to sparse range set.
We need to be able to request frontend metadata from shards,
so we can see whether it converges to a single range.

Also rework testing datastores, so we can inject custom testing
implementations allowing us to retrieve frontend metadata.

JIRA: CONTROLLER-1879
Change-Id: Ia8f350be7831b0c8f6846ee6fa41a665d1e191f4
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
14 files changed:
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreWithSegmentedJournalIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java [new file with mode: 0644]

index c10cbca772a4dc2841e165590c71a70f21d994fb..0593b656650b9528ab8dd4c6492dda2421e9e43b 100644 (file)
       <version>3.1.5</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <version>3.1.6</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
index 55108e0ef9cd720895b04d560145022b3cdae703..43314cd3ba0c7ab84987ea2a695533f852bf850a 100644 (file)
@@ -27,6 +27,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
@@ -89,7 +90,7 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
 
         PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
 
-        ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration)
+        AbstractShardManagerCreator<?> creator = getShardManagerCreator().cluster(cluster).configuration(configuration)
                 .datastoreContextFactory(datastoreContextFactory)
                 .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
                 .primaryShardInfoCache(primaryShardInfoCache)
@@ -147,6 +148,10 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
                 .duration().toMillis() * READY_WAIT_FACTOR;
     }
 
+    protected AbstractShardManagerCreator<?> getShardManagerCreator() {
+        return new ShardManagerCreator();
+    }
+
     protected final DataStoreClient getClient() {
         return client;
     }
@@ -253,8 +258,9 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
-            final String shardDispatcher, final String shardManagerId) {
+    private static ActorRef createShardManager(final ActorSystem actorSystem,
+            final AbstractShardManagerCreator<?> creator, final String shardDispatcher,
+            final String shardManagerId) {
         Exception lastException = null;
 
         for (int i = 0; i < 100; i++) {
index d02bd32099adfb0b2d3accd29edd404a7b854cf5..a7b89e18003844e96ed3e097381786dda4714b99 100644 (file)
@@ -190,7 +190,9 @@ public class Shard extends RaftActor {
 
     private final ShardTransactionMessageRetrySupport messageRetrySupport;
 
-    private final FrontendMetadata frontendMetadata;
+    @VisibleForTesting
+    final FrontendMetadata frontendMetadata;
+
     private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
     private boolean paused;
 
@@ -1084,16 +1086,17 @@ public class Shard extends RaftActor {
     }
 
     public abstract static class AbstractBuilder<T extends AbstractBuilder<T, S>, S extends Shard> {
-        private final Class<S> shardClass;
+        private final Class<? extends S> shardClass;
         private ShardIdentifier id;
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private DatastoreContext datastoreContext;
         private SchemaContextProvider schemaContextProvider;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
         private DataTree dataTree;
+
         private volatile boolean sealed;
 
-        protected AbstractBuilder(final Class<S> shardClass) {
+        protected AbstractBuilder(final Class<? extends S> shardClass) {
             this.shardClass = shardClass;
         }
 
@@ -1194,7 +1197,11 @@ public class Shard extends RaftActor {
 
     public static class Builder extends AbstractBuilder<Builder, Shard> {
         Builder() {
-            super(Shard.class);
+            this(Shard.class);
+        }
+
+        Builder(Class<? extends Shard> shardClass) {
+            super(shardClass);
         }
     }
 
index 97cbc5b0e00eb2afc9bf1491181ce0120a6f64b8..c1ccf65772e2575deed748822ac12bf404ab741c 100644 (file)
@@ -12,6 +12,7 @@ import static java.util.Objects.requireNonNull;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.util.HashSet;
@@ -280,6 +281,11 @@ final class ShardInformation {
         schemaContextProvider.set(Preconditions.checkNotNull(schemaContext));
     }
 
+    @VisibleForTesting
+    Shard.AbstractBuilder<?, ?> getBuilder() {
+        return builder;
+    }
+
     @Override
     public String toString() {
         return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized="
index e1752cc57d5c8347948887c86c06fb9162e93ca8..43446b5ce2ce760d13c29a92e0a38c79acd74d9b 100644 (file)
@@ -137,7 +137,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // Stores a mapping between a shard name and it's corresponding information
     // Shard names look like inventory, topology etc and are as specified in
     // configuration
-    private final Map<String, ShardInformation> localShards = new HashMap<>();
+    @VisibleForTesting
+    final Map<String, ShardInformation> localShards = new HashMap<>();
 
     // The type of a ShardManager reflects the type of the datastore itself
     // A data store could be of type config/operational
@@ -147,7 +148,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final Configuration configuration;
 
-    private final String shardDispatcherPath;
+    @VisibleForTesting
+    final String shardDispatcherPath;
 
     private final ShardManagerInfo shardManagerMBean;
 
@@ -157,7 +159,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
 
-    private final ShardPeerAddressResolver peerAddressResolver;
+    @VisibleForTesting
+    final ShardPeerAddressResolver peerAddressResolver;
 
     private SchemaContext schemaContext;
 
@@ -1279,18 +1282,27 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
 
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
-            localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
-                    newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
-                        shardSnapshots.get(shardName)), peerAddressResolver));
+            localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
+                    newShardDatastoreContext(shardName), shardSnapshots));
         }
     }
 
+    @VisibleForTesting
+    ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId,
+                                        Map<String, String> peerAddresses,
+                                        DatastoreContext datastoreContext,
+                                        Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+        return new ShardInformation(shardName, shardId, peerAddresses,
+                datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
+                peerAddressResolver);
+    }
+
     /**
      * Given the name of the shard find the addresses of all it's peers.
      *
      * @param shardName the shard name
      */
-    private Map<String, String> getPeerAddresses(final String shardName) {
+    Map<String, String> getPeerAddresses(final String shardName) {
         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
         return getPeerAddresses(shardName, members);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java
new file mode 100644 (file)
index 0000000..919e4d8
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+
+public class TestClientBackedDataStore extends ClientBackedDataStore {
+    public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+                                     final Configuration configuration,
+                                     final DatastoreContextFactory datastoreContextFactory,
+                                     final DatastoreSnapshot restoreFromSnapshot) {
+        super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
+    }
+
+    TestClientBackedDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier,
+                              final DataStoreClient clientActor) {
+        super(actorUtils, identifier, clientActor);
+    }
+
+    @Override
+    protected AbstractShardManagerCreator<?> getShardManagerCreator() {
+        return new TestShardManager.TestShardManagerCreator();
+    }
+}
index 62986b2ebfe86f57a378d973e1be36e01eb27c29..b7c863a104a4cecb9d508eab4236cfb0d5447b7b 100644 (file)
@@ -12,15 +12,17 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.runners.Parameterized.Parameter;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -30,20 +32,27 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
@@ -244,6 +253,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
     }
 
     @Test
+    @Ignore("Flushes a closed tx leak in single node, needs to be handled separately")
     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
@@ -256,15 +266,40 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
             writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
             testKit.doCommit(writeTx.ready());
 
-            writeTx = txChain.newWriteOnlyTransaction();
-
             int numCars = 5;
             for (int i = 0; i < numCars; i++) {
+                writeTx = txChain.newWriteOnlyTransaction();
                 writeTx.write(CarsModel.newCarPath("car" + i),
                     CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+                testKit.doCommit(writeTx.ready());
+
+                DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+                domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+                domStoreReadTransaction.close();
             }
 
-            testKit.doCommit(writeTx.ready());
+            // verify frontend metadata has no holes in purged transactions causing overtime memory leak
+            Optional<ActorRef> localShard = dataStore.getActorUtils().findLocalShard("cars-1");
+            FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                    (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+                            .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+            if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                Iterator<FrontendHistoryMetadata> iterator =
+                        frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                FrontendHistoryMetadata metadata = iterator.next();
+                while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                    metadata = iterator.next();
+                }
+                Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+                assertEquals(1, ranges.size());
+            } else {
+                // ask based should track no metadata
+                assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+            }
 
             final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
                     .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
@@ -341,7 +376,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest {
                 } catch (final ExecutionException e) {
                     final String msg = "Unexpected exception: "
                             + Throwables.getStackTraceAsString(e.getCause());
-                    if (DistributedDataStore.class.equals(testParameter)) {
+                    if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
                         assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
                     } else {
                         assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
index d984535db811a1e5ad066c62a80bee5e5617dad3..9f88ae4c486ca8be65672ba2a20185faf709fb72 100644 (file)
@@ -35,7 +35,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@@ -55,7 +55,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractDistributedData
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class }, { ClientBackedDataStore.class }
+                { TestDistributedDataStore.class }, { TestClientBackedDataStore.class }
         });
     }
 
index 04217d194413184c3e3d83ef97216ffb665cbf83..6fead6bbad2d0166d1183ba72146b45ee318139c 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -30,6 +31,8 @@ import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -38,15 +41,18 @@ import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -58,7 +64,9 @@ import org.opendaylight.controller.cluster.access.client.RequestTimeoutException
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -68,6 +76,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@@ -118,7 +128,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
+                { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
         });
     }
 
@@ -328,6 +338,131 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
+    @Test
+    public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
+        final String testName = "testWriteTransactionWithSingleShard";
+        initDatastoresWithCars(testName);
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.write(CarsModel.newCarPath("car" + i),
+                    CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+            followerTestKit.doCommit(writeTx.ready());
+
+            DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+            domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+            domStoreReadTransaction.close();
+        }
+
+        // wait to let the shard catch up with purged
+        await("Range set leak test").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Optional<ActorRef> localShard =
+                            leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                                    .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        Iterator<FrontendHistoryMetadata> iterator =
+                                frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                        FrontendHistoryMetadata metadata = iterator.next();
+                        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                            metadata = iterator.next();
+                        }
+
+                        assertEquals(0, metadata.getClosedTransactions().size());
+                        assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
+                                metadata.getPurgedTransactions().asRanges().iterator().next());
+                    } else {
+                        // ask based should track no metadata
+                        assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+                    }
+                });
+
+        final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+                .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        assertTrue("isPresent", optional.isPresent());
+        assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+    }
+
+    @Test
+    @Ignore("Flushes out tell based leak needs to be handled separately")
+    public void testCloseTransactionMetadataLeak() throws Exception {
+        // Ask based frontend seems to have some issues with back to back close
+        Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
+
+        final String testName = "testWriteTransactionWithSingleShard";
+        initDatastoresWithCars(testName);
+
+        final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        int numCars = 5;
+        for (int i = 0; i < numCars; i++) {
+            writeTx = txChain.newWriteOnlyTransaction();
+            writeTx.close();
+
+            DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+            domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+            domStoreReadTransaction.close();
+        }
+
+        writeTx = txChain.newWriteOnlyTransaction();
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        followerTestKit.doCommit(writeTx.ready());
+
+        // wait to let the shard catch up with purged
+        await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> {
+                    Optional<ActorRef> localShard =
+                            leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+                    FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+                            (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+                                    .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+                    if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+                        Iterator<FrontendHistoryMetadata> iterator =
+                                frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+                        FrontendHistoryMetadata metadata = iterator.next();
+                        while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+                            metadata = iterator.next();
+                        }
+
+                        Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+                        assertEquals(0, metadata.getClosedTransactions().size());
+                        assertEquals(1, ranges.size());
+                    } else {
+                        // ask based should track no metadata
+                        assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+                    }
+                });
+
+        final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+                .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        assertTrue("isPresent", optional.isPresent());
+        assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+    }
+
     @Test
     public void testReadWriteTransactionWithSingleShard() throws Exception {
         initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
@@ -771,8 +906,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         followerDatastoreContextBuilder.shardBatchedModificationCount(2);
         leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
         initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
@@ -883,8 +1018,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testLeadershipTransferOnShutdown() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
         followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
         final String testName = "testLeadershipTransferOnShutdown";
@@ -948,8 +1083,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionWithIsolatedLeader() throws Exception {
-        //TODO remove when test passes also for ClientBackedDataStore
-        Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+        // FIXME: remove when test passes also for ClientBackedDataStore
+        Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
         // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
         leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
         final String testName = "testTransactionWithIsolatedLeader";
@@ -1028,7 +1163,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             fail("Exception expected");
         } catch (final ExecutionException e) {
             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
-            if (DistributedDataStore.class.equals(testParameter)) {
+            if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
                 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
                         || e.getCause() instanceof ShardLeaderNotRespondingException);
             } else {
@@ -1067,7 +1202,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             fail("Exception expected");
         } catch (final ExecutionException e) {
             final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
-            if (DistributedDataStore.class.equals(testParameter)) {
+            if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
                 assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
             } else {
                 assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
@@ -1157,7 +1292,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Test
     public void testReadWriteMessageSlicing() throws Exception {
         // The slicing is only implemented for tell-based protocol
-        Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+        Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
 
         leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
         followerDatastoreContextBuilder.maximumMessageSliceSize(100);
index 73888de3909c9a919eac3b5c9d8c7c6a501164c5..0090bf3a1b11c356b871660cb9a2eb19284fbe71 100644 (file)
@@ -34,6 +34,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
@@ -53,7 +54,8 @@ public class DistributedDataStoreWithSegmentedJournalIntegrationTest
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class }});
+                { TestDistributedDataStore.class }, { TestClientBackedDataStore.class }
+        });
     }
 
     @Before
index 8dded6dd8dd3d1af1efffdfa1d04df13d07d775e..2cb63ce79c49187d0b4a06a49bea9323c843ef20 100644 (file)
@@ -29,6 +29,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
@@ -132,6 +133,9 @@ public class IntegrationTestKit extends ShardTestKit {
 
         setDataStoreName(typeName);
 
+        // Make sure we set up datastore context correctly
+        datastoreContextBuilder.useTellBasedProtocol(ClientBackedDataStore.class.isAssignableFrom(implementation));
+
         final DatastoreContext datastoreContext = datastoreContextBuilder.build();
         final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java
new file mode 100644 (file)
index 0000000..3606414
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+
+public class TestDistributedDataStore extends DistributedDataStore {
+
+    public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+                                    final Configuration configuration,
+                                    final DatastoreContextFactory datastoreContextFactory,
+                                    final DatastoreSnapshot restoreFromSnapshot) {
+        super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
+    }
+
+    TestDistributedDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier) {
+        super(actorUtils, identifier);
+    }
+
+    @Override
+    protected AbstractShardManagerCreator<?> getShardManagerCreator() {
+        return new TestShardManager.TestShardManagerCreator();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java
new file mode 100644 (file)
index 0000000..b0e744a
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
+
+public class TestShard extends Shard {
+    // Message to request FrontendMetadata
+    public static final class RequestFrontendMetadata {
+
+    }
+
+    protected TestShard(AbstractBuilder<?, ?> builder) {
+        super(builder);
+    }
+
+    @Override
+    protected void handleNonRaftCommand(Object message) {
+        if (message instanceof  RequestFrontendMetadata) {
+            FrontendShardDataTreeSnapshotMetadata metadataSnapshot = frontendMetadata.toSnapshot();
+            sender().tell(metadataSnapshot, self());
+        } else {
+            super.handleNonRaftCommand(message);
+        }
+    }
+
+    public static Shard.Builder builder() {
+        return new TestShard.Builder();
+    }
+
+    public static class Builder extends Shard.Builder {
+        Builder() {
+            super(TestShard.class);
+        }
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java
new file mode 100644 (file)
index 0000000..0783c16
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.shardmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.TestShard;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+
+public class TestShardManager extends ShardManager {
+    TestShardManager(AbstractShardManagerCreator<?> builder) {
+        super(builder);
+    }
+
+    /**
+     * Plug into shard actor creation to replace info with our testing one.
+     * @param info shard info.
+     * @return actor for replaced shard info.
+     */
+    @Override
+    protected ActorRef newShardActor(ShardInformation info) {
+        ShardInformation newInfo = new ShardInformation(info.getShardName(),
+                info.getShardId(), getPeerAddresses(info.getShardName()),
+                info.getDatastoreContext(),
+                TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
+                peerAddressResolver);
+        newInfo.setSchemaContext(info.getSchemaContext());
+        newInfo.setActiveMember(info.isActiveMember());
+
+
+        localShards.put(info.getShardName(), info);
+        return getContext().actorOf(newInfo.newProps().withDispatcher(shardDispatcherPath),
+                info.getShardId().toString());
+    }
+
+    @Override
+    ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId,
+                                        Map<String, String> peerAddresses,
+                                        DatastoreContext datastoreContext,
+                                        Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+        return new ShardInformation(shardName, shardId, peerAddresses,
+                datastoreContext, TestShard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
+                peerAddressResolver);
+    }
+
+    public static class TestShardManagerCreator extends AbstractShardManagerCreator<TestShardManagerCreator> {
+        @Override
+        public Props props() {
+            verify();
+            return Props.create(TestShardManager.class, this);
+        }
+    }
+}