<version>3.1.5</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>3.1.6</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
- ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration)
+ AbstractShardManagerCreator<?> creator = getShardManagerCreator().cluster(cluster).configuration(configuration)
.datastoreContextFactory(datastoreContextFactory)
.waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
.primaryShardInfoCache(primaryShardInfoCache)
.duration().toMillis() * READY_WAIT_FACTOR;
}
+ protected AbstractShardManagerCreator<?> getShardManagerCreator() {
+ return new ShardManagerCreator();
+ }
+
protected final DataStoreClient getClient() {
return client;
}
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
- final String shardDispatcher, final String shardManagerId) {
+ private static ActorRef createShardManager(final ActorSystem actorSystem,
+ final AbstractShardManagerCreator<?> creator, final String shardDispatcher,
+ final String shardManagerId) {
Exception lastException = null;
for (int i = 0; i < 100; i++) {
private final ShardTransactionMessageRetrySupport messageRetrySupport;
- private final FrontendMetadata frontendMetadata;
+ @VisibleForTesting
+ final FrontendMetadata frontendMetadata;
+
private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
private boolean paused;
}
public abstract static class AbstractBuilder<T extends AbstractBuilder<T, S>, S extends Shard> {
- private final Class<S> shardClass;
+ private final Class<? extends S> shardClass;
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
private SchemaContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private DataTree dataTree;
+
private volatile boolean sealed;
- protected AbstractBuilder(final Class<S> shardClass) {
+ protected AbstractBuilder(final Class<? extends S> shardClass) {
this.shardClass = shardClass;
}
public static class Builder extends AbstractBuilder<Builder, Shard> {
Builder() {
- super(Shard.class);
+ this(Shard.class);
+ }
+
+ Builder(Class<? extends Shard> shardClass) {
+ super(shardClass);
}
}
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.serialization.Serialization;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.HashSet;
schemaContextProvider.set(Preconditions.checkNotNull(schemaContext));
}
+ @VisibleForTesting
+ Shard.AbstractBuilder<?, ?> getBuilder() {
+ return builder;
+ }
+
@Override
public String toString() {
return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized="
// Stores a mapping between a shard name and it's corresponding information
// Shard names look like inventory, topology etc and are as specified in
// configuration
- private final Map<String, ShardInformation> localShards = new HashMap<>();
+ @VisibleForTesting
+ final Map<String, ShardInformation> localShards = new HashMap<>();
// The type of a ShardManager reflects the type of the datastore itself
// A data store could be of type config/operational
private final Configuration configuration;
- private final String shardDispatcherPath;
+ @VisibleForTesting
+ final String shardDispatcherPath;
private final ShardManagerInfo shardManagerMBean;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
- private final ShardPeerAddressResolver peerAddressResolver;
+ @VisibleForTesting
+ final ShardPeerAddressResolver peerAddressResolver;
private SchemaContext schemaContext;
LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
- localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
- newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
- shardSnapshots.get(shardName)), peerAddressResolver));
+ localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
+ newShardDatastoreContext(shardName), shardSnapshots));
}
}
+ @VisibleForTesting
+ ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId,
+ Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext,
+ Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+ return new ShardInformation(shardName, shardId, peerAddresses,
+ datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
+ peerAddressResolver);
+ }
+
/**
* Given the name of the shard find the addresses of all it's peers.
*
* @param shardName the shard name
*/
- private Map<String, String> getPeerAddresses(final String shardName) {
+ Map<String, String> getPeerAddresses(final String shardName) {
final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
return getPeerAddresses(shardName, members);
}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+
+public class TestClientBackedDataStore extends ClientBackedDataStore {
+ public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+ final Configuration configuration,
+ final DatastoreContextFactory datastoreContextFactory,
+ final DatastoreSnapshot restoreFromSnapshot) {
+ super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
+ }
+
+ TestClientBackedDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier,
+ final DataStoreClient clientActor) {
+ super(actorUtils, identifier, clientActor);
+ }
+
+ @Override
+ protected AbstractShardManagerCreator<?> getShardManagerCreator() {
+ return new TestShardManager.TestShardManagerCreator();
+ }
+}
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.runners.Parameterized.Parameter;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Ignore;
import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
}
@Test
+ @Ignore("Flushes a closed tx leak in single node, needs to be handled separately")
public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
testKit.doCommit(writeTx.ready());
- writeTx = txChain.newWriteOnlyTransaction();
-
int numCars = 5;
for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
writeTx.write(CarsModel.newCarPath("car" + i),
CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+ testKit.doCommit(writeTx.ready());
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
}
- testKit.doCommit(writeTx.ready());
+ // verify frontend metadata has no holes in purged transactions causing overtime memory leak
+ Optional<ActorRef> localShard = dataStore.getActorUtils().findLocalShard("cars-1");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+ Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+ assertEquals(1, ranges.size());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: "
+ Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
+ if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
} else {
assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { DistributedDataStore.class }, { ClientBackedDataStore.class }
+ { TestDistributedDataStore.class }, { TestClientBackedDataStore.class }
});
}
*/
package org.opendaylight.controller.cluster.datastore;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
+ { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
});
}
}
}
+ @Test
+ public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
+ final String testName = "testWriteTransactionWithSingleShard";
+ initDatastoresWithCars(testName);
+
+ final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ int numCars = 5;
+ for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.newCarPath("car" + i),
+ CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+ followerTestKit.doCommit(writeTx.ready());
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
+ }
+
+ // wait to let the shard catch up with purged
+ await("Range set leak test").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Optional<ActorRef> localShard =
+ leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+
+ assertEquals(0, metadata.getClosedTransactions().size());
+ assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
+ metadata.getPurgedTransactions().asRanges().iterator().next());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
+ });
+
+ final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ }
+
+ @Test
+ @Ignore("Flushes out tell based leak needs to be handled separately")
+ public void testCloseTransactionMetadataLeak() throws Exception {
+ // Ask based frontend seems to have some issues with back to back close
+ Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
+
+ final String testName = "testWriteTransactionWithSingleShard";
+ initDatastoresWithCars(testName);
+
+ final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ int numCars = 5;
+ for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.close();
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
+ }
+
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ // wait to let the shard catch up with purged
+ await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Optional<ActorRef> localShard =
+ leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+
+ Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+ assertEquals(0, metadata.getClosedTransactions().size());
+ assertEquals(1, ranges.size());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
+ });
+
+ final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ }
+
@Test
public void testReadWriteTransactionWithSingleShard() throws Exception {
initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
@Test
public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+ // FIXME: remove when test passes also for ClientBackedDataStore
+ Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
followerDatastoreContextBuilder.shardBatchedModificationCount(2);
leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
@Test
public void testLeadershipTransferOnShutdown() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+ // FIXME: remove when test passes also for ClientBackedDataStore
+ Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
final String testName = "testLeadershipTransferOnShutdown";
@Test
public void testTransactionWithIsolatedLeader() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+ // FIXME: remove when test passes also for ClientBackedDataStore
+ Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
// Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
final String testName = "testTransactionWithIsolatedLeader";
fail("Exception expected");
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
+ if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
|| e.getCause() instanceof ShardLeaderNotRespondingException);
} else {
fail("Exception expected");
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
+ if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
} else {
assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
@Test
public void testReadWriteMessageSlicing() throws Exception {
// The slicing is only implemented for tell-based protocol
- Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+ Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
followerDatastoreContextBuilder.maximumMessageSliceSize(100);
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { DistributedDataStore.class }});
+ { TestDistributedDataStore.class }, { TestClientBackedDataStore.class }
+ });
}
@Before
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
setDataStoreName(typeName);
+ // Make sure we set up datastore context correctly
+ datastoreContextBuilder.useTellBasedProtocol(ClientBackedDataStore.class.isAssignableFrom(implementation));
+
final DatastoreContext datastoreContext = datastoreContextBuilder.build();
final DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+
+public class TestDistributedDataStore extends DistributedDataStore {
+
+ public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+ final Configuration configuration,
+ final DatastoreContextFactory datastoreContextFactory,
+ final DatastoreSnapshot restoreFromSnapshot) {
+ super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
+ }
+
+ TestDistributedDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier) {
+ super(actorUtils, identifier);
+ }
+
+ @Override
+ protected AbstractShardManagerCreator<?> getShardManagerCreator() {
+ return new TestShardManager.TestShardManagerCreator();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
+
+public class TestShard extends Shard {
+ // Message to request FrontendMetadata
+ public static final class RequestFrontendMetadata {
+
+ }
+
+ protected TestShard(AbstractBuilder<?, ?> builder) {
+ super(builder);
+ }
+
+ @Override
+ protected void handleNonRaftCommand(Object message) {
+ if (message instanceof RequestFrontendMetadata) {
+ FrontendShardDataTreeSnapshotMetadata metadataSnapshot = frontendMetadata.toSnapshot();
+ sender().tell(metadataSnapshot, self());
+ } else {
+ super.handleNonRaftCommand(message);
+ }
+ }
+
+ public static Shard.Builder builder() {
+ return new TestShard.Builder();
+ }
+
+ public static class Builder extends Shard.Builder {
+ Builder() {
+ super(TestShard.class);
+ }
+ }
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.shardmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.TestShard;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+
+public class TestShardManager extends ShardManager {
+ TestShardManager(AbstractShardManagerCreator<?> builder) {
+ super(builder);
+ }
+
+ /**
+ * Plug into shard actor creation to replace info with our testing one.
+ * @param info shard info.
+ * @return actor for replaced shard info.
+ */
+ @Override
+ protected ActorRef newShardActor(ShardInformation info) {
+ ShardInformation newInfo = new ShardInformation(info.getShardName(),
+ info.getShardId(), getPeerAddresses(info.getShardName()),
+ info.getDatastoreContext(),
+ TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
+ peerAddressResolver);
+ newInfo.setSchemaContext(info.getSchemaContext());
+ newInfo.setActiveMember(info.isActiveMember());
+
+
+ localShards.put(info.getShardName(), info);
+ return getContext().actorOf(newInfo.newProps().withDispatcher(shardDispatcherPath),
+ info.getShardId().toString());
+ }
+
+ @Override
+ ShardInformation createShardInfoFor(String shardName, ShardIdentifier shardId,
+ Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext,
+ Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
+ return new ShardInformation(shardName, shardId, peerAddresses,
+ datastoreContext, TestShard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
+ peerAddressResolver);
+ }
+
+ public static class TestShardManagerCreator extends AbstractShardManagerCreator<TestShardManagerCreator> {
+ @Override
+ public Props props() {
+ verify();
+ return Props.create(TestShardManager.class, this);
+ }
+ }
+}