From 3115b8171461584e85f58d87a9f179013cfbb262 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Thu, 9 May 2019 11:12:34 +0200 Subject: [PATCH] Add unit test for FrontedMetadata memory leaks This memory leak occurs when write and read-only transactions intertwine, leading to sparse range set. We need to be able to request frontend metadata from shards, so we can see whether it converges to a single range. Also rework testing datastores, so we can inject custom testing implementations allowing us to retrieve frontend metadata. JIRA: CONTROLLER-1879 Change-Id: Ia8f350be7831b0c8f6846ee6fa41a665d1e191f4 Signed-off-by: Tomas Cere Signed-off-by: Robert Varga --- .../md-sal/sal-distributed-datastore/pom.xml | 6 + .../cluster/datastore/AbstractDataStore.java | 12 +- .../controller/cluster/datastore/Shard.java | 15 +- .../shardmanager/ShardInformation.java | 6 + .../datastore/shardmanager/ShardManager.java | 26 ++- .../databroker/TestClientBackedDataStore.java | 38 +++++ ...ctDistributedDataStoreIntegrationTest.java | 45 ++++- .../DistributedDataStoreIntegrationTest.java | 4 +- ...butedDataStoreRemotingIntegrationTest.java | 155 ++++++++++++++++-- ...reWithSegmentedJournalIntegrationTest.java | 4 +- .../cluster/datastore/IntegrationTestKit.java | 4 + .../datastore/TestDistributedDataStore.java | 35 ++++ .../cluster/datastore/TestShard.java | 42 +++++ .../shardmanager/TestShardManager.java | 61 +++++++ 14 files changed, 421 insertions(+), 32 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index c10cbca772..0593b65665 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -220,6 +220,12 @@ 3.1.5 test + + org.awaitility + awaitility + 3.1.6 + test + commons-io commons-io diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java index 55108e0ef9..43314cd3ba 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java @@ -27,6 +27,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator; import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; @@ -89,7 +90,7 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache(); - ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration) + AbstractShardManagerCreator creator = getShardManagerCreator().cluster(cluster).configuration(configuration) .datastoreContextFactory(datastoreContextFactory) .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch) .primaryShardInfoCache(primaryShardInfoCache) @@ -147,6 +148,10 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface .duration().toMillis() * READY_WAIT_FACTOR; } + protected AbstractShardManagerCreator getShardManagerCreator() { + return new ShardManagerCreator(); + } + protected final DataStoreClient getClient() { return client; } @@ -253,8 +258,9 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface } @SuppressWarnings("checkstyle:IllegalCatch") - private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator, - final String shardDispatcher, final String shardManagerId) { + private static ActorRef createShardManager(final ActorSystem actorSystem, + final AbstractShardManagerCreator creator, final String shardDispatcher, + final String shardManagerId) { Exception lastException = null; for (int i = 0; i < 100; i++) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index d02bd32099..a7b89e1800 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -190,7 +190,9 @@ public class Shard extends RaftActor { private final ShardTransactionMessageRetrySupport messageRetrySupport; - private final FrontendMetadata frontendMetadata; + @VisibleForTesting + final FrontendMetadata frontendMetadata; + private Map knownFrontends = ImmutableMap.of(); private boolean paused; @@ -1084,16 +1086,17 @@ public class Shard extends RaftActor { } public abstract static class AbstractBuilder, S extends Shard> { - private final Class shardClass; + private final Class shardClass; private ShardIdentifier id; private Map peerAddresses = Collections.emptyMap(); private DatastoreContext datastoreContext; private SchemaContextProvider schemaContextProvider; private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot; private DataTree dataTree; + private volatile boolean sealed; - protected AbstractBuilder(final Class shardClass) { + protected AbstractBuilder(final Class shardClass) { this.shardClass = shardClass; } @@ -1194,7 +1197,11 @@ public class Shard extends RaftActor { public static class Builder extends AbstractBuilder { Builder() { - super(Shard.class); + this(Shard.class); + } + + Builder(Class shardClass) { + super(shardClass); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java index 97cbc5b0e0..c1ccf65772 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardInformation.java @@ -12,6 +12,7 @@ import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; import akka.actor.Props; import akka.serialization.Serialization; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import java.util.HashSet; @@ -280,6 +281,11 @@ final class ShardInformation { schemaContextProvider.set(Preconditions.checkNotNull(schemaContext)); } + @VisibleForTesting + Shard.AbstractBuilder getBuilder() { + return builder; + } + @Override public String toString() { return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized=" diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index e1752cc57d..43446b5ce2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -137,7 +137,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Stores a mapping between a shard name and it's corresponding information // Shard names look like inventory, topology etc and are as specified in // configuration - private final Map localShards = new HashMap<>(); + @VisibleForTesting + final Map localShards = new HashMap<>(); // The type of a ShardManager reflects the type of the datastore itself // A data store could be of type config/operational @@ -147,7 +148,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Configuration configuration; - private final String shardDispatcherPath; + @VisibleForTesting + final String shardDispatcherPath; private final ShardManagerInfo shardManagerMBean; @@ -157,7 +159,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final PrimaryShardInfoFutureCache primaryShardInfoCache; - private final ShardPeerAddressResolver peerAddressResolver; + @VisibleForTesting + final ShardPeerAddressResolver peerAddressResolver; private SchemaContext schemaContext; @@ -1279,18 +1282,27 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId); Map peerAddresses = getPeerAddresses(shardName); - localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, - newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( - shardSnapshots.get(shardName)), peerAddressResolver)); + localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses, + newShardDatastoreContext(shardName), shardSnapshots)); } } + @VisibleForTesting + ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId, + Map peerAddresses, + DatastoreContext datastoreContext, + Map shardSnapshots) { + return new ShardInformation(shardName, shardId, peerAddresses, + datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)), + peerAddressResolver); + } + /** * Given the name of the shard find the addresses of all it's peers. * * @param shardName the shard name */ - private Map getPeerAddresses(final String shardName) { + Map getPeerAddresses(final String shardName) { final Collection members = configuration.getMembersFromShardName(shardName); return getPeerAddresses(shardName, members); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java new file mode 100644 index 0000000000..919e4d82c7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/databroker/TestClientBackedDataStore.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker; + +import akka.actor.ActorSystem; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; +import org.opendaylight.controller.cluster.datastore.ClusterWrapper; +import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator; +import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; + +public class TestClientBackedDataStore extends ClientBackedDataStore { + public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, + final Configuration configuration, + final DatastoreContextFactory datastoreContextFactory, + final DatastoreSnapshot restoreFromSnapshot) { + super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot); + } + + TestClientBackedDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier, + final DataStoreClient clientActor) { + super(actorUtils, identifier, clientActor); + } + + @Override + protected AbstractShardManagerCreator getShardManagerCreator() { + return new TestShardManager.TestShardManagerCreator(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java index 62986b2ebf..b7c863a104 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractDistributedDataStoreIntegrationTest.java @@ -12,15 +12,17 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.runners.Parameterized.Parameter; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import akka.actor.ActorRef; import akka.actor.ActorSystem; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -30,20 +32,27 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.junit.Ignore; import org.junit.Test; +import org.junit.runners.Parameterized.Parameter; import org.mockito.Mockito; import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; +import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; @@ -244,6 +253,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { } @Test + @Ignore("Flushes a closed tx leak in single node, needs to be handled separately") public void testSingleTransactionsWritesInQuickSuccession() throws Exception { final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder); try (AbstractDataStore dataStore = testKit.setupAbstractDataStore( @@ -256,15 +266,40 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); testKit.doCommit(writeTx.ready()); - writeTx = txChain.newWriteOnlyTransaction(); - int numCars = 5; for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); writeTx.write(CarsModel.newCarPath("car" + i), CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + + testKit.doCommit(writeTx.ready()); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); } - testKit.doCommit(writeTx.ready()); + // verify frontend metadata has no holes in purged transactions causing overtime memory leak + Optional localShard = dataStore.getActorUtils().findLocalShard("cars-1"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + Set> ranges = metadata.getPurgedTransactions().asRanges(); + + assertEquals(1, ranges.size()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } final Optional> optional = txChain.newReadOnlyTransaction() .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); @@ -341,7 +376,7 @@ public abstract class AbstractDistributedDataStoreIntegrationTest { } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException); } else { assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index d984535db8..9f88ae4c48 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -35,7 +35,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; +import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; @@ -55,7 +55,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractDistributedData @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class }, { ClientBackedDataStore.class } + { TestDistributedDataStore.class }, { TestClientBackedDataStore.class } }); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 04217d1944..6fead6bbad 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -30,6 +31,8 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -38,15 +41,18 @@ import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.junit.After; import org.junit.Assume; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -58,7 +64,9 @@ import org.opendaylight.controller.cluster.access.client.RequestTimeoutException import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; +import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -68,6 +76,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata; +import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; @@ -118,7 +128,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 } + { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 } }); } @@ -328,6 +338,131 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { } } + @Test + public void testSingleTransactionsWritesInQuickSuccession() throws Exception { + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + + followerTestKit.doCommit(writeTx.ready()); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + // wait to let the shard catch up with purged + await("Range set leak test").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + assertEquals(0, metadata.getClosedTransactions().size()); + assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)), + metadata.getPurgedTransactions().asRanges().iterator().next()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + + @Test + @Ignore("Flushes out tell based leak needs to be handled separately") + public void testCloseTransactionMetadataLeak() throws Exception { + // Ask based frontend seems to have some issues with back to back close + Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class)); + + final String testName = "testWriteTransactionWithSingleShard"; + initDatastoresWithCars(testName); + + final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + int numCars = 5; + for (int i = 0; i < numCars; i++) { + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.close(); + + DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction(); + domStoreReadTransaction.read(CarsModel.BASE_PATH).get(); + + domStoreReadTransaction.close(); + } + + writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + followerTestKit.doCommit(writeTx.ready()); + + // wait to let the shard catch up with purged + await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Optional localShard = + leaderDistributedDataStore.getActorUtils().findLocalShard("cars"); + FrontendShardDataTreeSnapshotMetadata frontendMetadata = + (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils() + .executeOperation(localShard.get(), new RequestFrontendMetadata()); + + if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) { + Iterator iterator = + frontendMetadata.getClients().get(0).getCurrentHistories().iterator(); + FrontendHistoryMetadata metadata = iterator.next(); + while (iterator.hasNext() && metadata.getHistoryId() != 1) { + metadata = iterator.next(); + } + + Set> ranges = metadata.getPurgedTransactions().asRanges(); + + assertEquals(0, metadata.getClosedTransactions().size()); + assertEquals(1, ranges.size()); + } else { + // ask based should track no metadata + assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty()); + } + }); + + final Optional> optional = txChain.newReadOnlyTransaction() + .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertTrue("isPresent", optional.isPresent()); + assertEquals("# cars", numCars, ((Collection) optional.get().getValue()).size()); + } + @Test public void testReadWriteTransactionWithSingleShard() throws Exception { initDatastoresWithCars("testReadWriteTransactionWithSingleShard"); @@ -771,8 +906,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionForwardedToLeaderAfterRetry() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); followerDatastoreContextBuilder.shardBatchedModificationCount(2); leaderDatastoreContextBuilder.shardBatchedModificationCount(2); initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry"); @@ -883,8 +1018,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testLeadershipTransferOnShutdown() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.shardBatchedModificationCount(1); followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null); final String testName = "testLeadershipTransferOnShutdown"; @@ -948,8 +1083,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithIsolatedLeader() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); + // FIXME: remove when test passes also for ClientBackedDataStore + Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter)); // Set the isolated leader check interval high so we can control the switch to IsolatedLeader. leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000); final String testName = "testTransactionWithIsolatedLeader"; @@ -1028,7 +1163,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException || e.getCause() instanceof ShardLeaderNotRespondingException); } else { @@ -1067,7 +1202,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - if (DistributedDataStore.class.equals(testParameter)) { + if (DistributedDataStore.class.isAssignableFrom(testParameter)) { assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); } else { assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); @@ -1157,7 +1292,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testReadWriteMessageSlicing() throws Exception { // The slicing is only implemented for tell-based protocol - Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class)); + Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter)); leaderDatastoreContextBuilder.maximumMessageSliceSize(100); followerDatastoreContextBuilder.maximumMessageSliceSize(100); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreWithSegmentedJournalIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreWithSegmentedJournalIntegrationTest.java index 73888de390..0090bf3a1b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreWithSegmentedJournalIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreWithSegmentedJournalIntegrationTest.java @@ -34,6 +34,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; @@ -53,7 +54,8 @@ public class DistributedDataStoreWithSegmentedJournalIntegrationTest @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class }}); + { TestDistributedDataStore.class }, { TestClientBackedDataStore.class } + }); } @Before diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index 8dded6dd8d..2cb63ce79c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -29,6 +29,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.mockito.Mockito; +import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; @@ -132,6 +133,9 @@ public class IntegrationTestKit extends ShardTestKit { setDataStoreName(typeName); + // Make sure we set up datastore context correctly + datastoreContextBuilder.useTellBasedProtocol(ClientBackedDataStore.class.isAssignableFrom(implementation)); + final DatastoreContext datastoreContext = datastoreContextBuilder.build(); final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class); Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java new file mode 100644 index 0000000000..36064145dd --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestDistributedDataStore.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSystem; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.datastore.config.Configuration; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator; +import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; + +public class TestDistributedDataStore extends DistributedDataStore { + + public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster, + final Configuration configuration, + final DatastoreContextFactory datastoreContextFactory, + final DatastoreSnapshot restoreFromSnapshot) { + super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot); + } + + TestDistributedDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier) { + super(actorUtils, identifier); + } + + @Override + protected AbstractShardManagerCreator getShardManagerCreator() { + return new TestShardManager.TestShardManagerCreator(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java new file mode 100644 index 0000000000..b0e744a24a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TestShard.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata; + +public class TestShard extends Shard { + // Message to request FrontendMetadata + public static final class RequestFrontendMetadata { + + } + + protected TestShard(AbstractBuilder builder) { + super(builder); + } + + @Override + protected void handleNonRaftCommand(Object message) { + if (message instanceof RequestFrontendMetadata) { + FrontendShardDataTreeSnapshotMetadata metadataSnapshot = frontendMetadata.toSnapshot(); + sender().tell(metadataSnapshot, self()); + } else { + super.handleNonRaftCommand(message); + } + } + + public static Shard.Builder builder() { + return new TestShard.Builder(); + } + + public static class Builder extends Shard.Builder { + Builder() { + super(TestShard.class); + } + } +} + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java new file mode 100644 index 0000000000..0783c16464 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/shardmanager/TestShardManager.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.shardmanager; + +import akka.actor.ActorRef; +import akka.actor.Props; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.TestShard; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; + +public class TestShardManager extends ShardManager { + TestShardManager(AbstractShardManagerCreator builder) { + super(builder); + } + + /** + * Plug into shard actor creation to replace info with our testing one. + * @param info shard info. + * @return actor for replaced shard info. + */ + @Override + protected ActorRef newShardActor(ShardInformation info) { + ShardInformation newInfo = new ShardInformation(info.getShardName(), + info.getShardId(), getPeerAddresses(info.getShardName()), + info.getDatastoreContext(), + TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()), + peerAddressResolver); + newInfo.setSchemaContext(info.getSchemaContext()); + newInfo.setActiveMember(info.isActiveMember()); + + + localShards.put(info.getShardName(), info); + return getContext().actorOf(newInfo.newProps().withDispatcher(shardDispatcherPath), + info.getShardId().toString()); + } + + @Override + ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId, + Map peerAddresses, + DatastoreContext datastoreContext, + Map shardSnapshots) { + return new ShardInformation(shardName, shardId, peerAddresses, + datastoreContext, TestShard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)), + peerAddressResolver); + } + + public static class TestShardManagerCreator extends AbstractShardManagerCreator { + @Override + public Props props() { + verify(); + return Props.create(TestShardManager.class, this); + } + } +} -- 2.36.6