From: Tomas Cere Date: Thu, 8 Dec 2016 10:54:07 +0000 (+0100) Subject: BUG-2138: DistributedShardListeners support for nested shards X-Git-Tag: release/carbon~157 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=20f8f30f4bbf1e982672c1f883a6a18b0e4539de;hp=13a15f0230ccc28a2ed7212b59dc785accf97b9f BUG-2138: DistributedShardListeners support for nested shards Adds support for listeners in shards that have a subshard/s, which re-asseble notifications received from subshards. Change-Id: Icc7dfb971731d78c306a87335e54668f3bbc133e Signed-off-by: Tomas Cere Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java new file mode 100644 index 0000000000..5e49c0069e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java @@ -0,0 +1,346 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.sharding; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; +import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration; +import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree; +import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode; +import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DistributedShardChangePublisher + extends AbstractRegistrationTree> + implements DOMStoreTreeChangePublisher { + + private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class); + + private final DistributedDataStore distributedDataStore; + private final YangInstanceIdentifier shardPath; + + // This will be useful for signaling back pressure + private final DataStoreClient client; + + private final Map childShards; + + @GuardedBy("this") + private final DataTree dataTree; + + public DistributedShardChangePublisher(final DataStoreClient client, + final DistributedDataStore distributedDataStore, + final DOMDataTreeIdentifier prefix, + final Map childShards) { + this.client = client; + this.distributedDataStore = distributedDataStore; + // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea + // maybe the whole listener logic would be better in the backend shards where we have direct access to the + // dataTree and wont have to cache it redundantly. + this.dataTree = InMemoryDataTreeFactory.getInstance().create( + TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier()); + + dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext()); + + this.shardPath = prefix.getRootIdentifier(); + this.childShards = childShards; + } + + protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration registration) { + LOG.debug("Closing registration {}", registration); + } + + @Override + public AbstractDOMDataTreeChangeListenerRegistration + registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) { + takeLock(); + try { + return setupListenerContext(path, listener); + } finally { + releaseLock(); + } + } + + private AbstractDOMDataTreeChangeListenerRegistration + setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) { + // we need to register the listener registration path based on the shards root + // we have to strip the shard path from the listener path and then register + YangInstanceIdentifier strippedIdentifier = listenerPath; + if (!shardPath.isEmpty()) { + strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath)); + } + + final DOMDataTreeListenerWithSubshards subshardListener = + new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener); + final AbstractDOMDataTreeChangeListenerRegistration reg = + setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener); + + for (final ChildShardContext maybeAffected : childShards.values()) { + if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) { + // consumer has initialDataChangeEvent subshard somewhere on lower level + // register to the notification manager with snapshot and forward child notifications to parent + LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath); + subshardListener.addSubshard(maybeAffected); + } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) { + // bind path is inside subshard + // TODO can this happen? seems like in ShardedDOMDataTree we are + // already registering to the lowest shard possible + throw new UnsupportedOperationException("Listener should be registered directly " + + "into initialDataChangeEvent subshard"); + } + } + + return reg; + } + + private AbstractDOMDataTreeChangeListenerRegistration + setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup, + final YangInstanceIdentifier listenerPath, + final DOMDataTreeListenerWithSubshards listener) { + + LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath); + + // register in the shard tree + final RegistrationTreeNode> node = + findNodeFor(listenerPath.getPathArguments()); + + // register listener in CDS + final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore + .registerProxyListener(shardLookup, listenerPath, listener), listener); + + @SuppressWarnings("unchecked") + final AbstractDOMDataTreeChangeListenerRegistration registration = + new AbstractDOMDataTreeChangeListenerRegistration((L) listener) { + @Override + protected void removeRegistration() { + listener.close(); + DistributedShardChangePublisher.this.removeRegistration(node, this); + registrationRemoved(this); + proxyReg.close(); + } + }; + addRegistration(node, registration); + + return registration; + } + + private static Iterable stripShardPath(final YangInstanceIdentifier shardPath, + final YangInstanceIdentifier listenerPath) { + if (shardPath.isEmpty()) { + return listenerPath.getPathArguments(); + } + + final List listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments()); + final Iterator shardIter = shardPath.getPathArguments().iterator(); + final Iterator listenerIter = listenerPathArgs.iterator(); + + while (shardIter.hasNext()) { + if (shardIter.next().equals(listenerIter.next())) { + listenerIter.remove(); + } else { + break; + } + } + + return listenerPathArgs; + } + + private static class ProxyRegistration implements ListenerRegistration { + + private final ListenerRegistration proxy; + private final DOMDataTreeChangeListener listener; + + private ProxyRegistration( + final ListenerRegistration< + org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy, + final DOMDataTreeChangeListener listener) { + this.proxy = proxy; + this.listener = listener; + } + + @Override + public DOMDataTreeChangeListener getInstance() { + return listener; + } + + @Override + public void close() { + proxy.close(); + } + } + + synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath, + final Collection changes) { + final DataTreeModification modification = dataTree.takeSnapshot().newModification(); + for (final DataTreeCandidate change : changes) { + try { + DataTreeCandidates.applyToModification(modification, change); + } catch (SchemaValidationFailedException e) { + LOG.error("Validation failed {}", e); + } + } + + modification.ready(); + + final DataTreeCandidate candidate; + + try { + dataTree.validate(modification); + } catch (final DataValidationFailedException e) { + LOG.error("Validation failed for built modification, modification {}, current data tree: {}", + modification, dataTree, e); + throw new RuntimeException("Notification validation failed", e); + } + + // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree + candidate = dataTree.prepare(modification); + dataTree.commit(candidate); + + + DataTreeCandidateNode modifiedChild = candidate.getRootNode(); + + for (final PathArgument pathArgument : listenerPath.getPathArguments()) { + modifiedChild = modifiedChild.getModifiedChild(pathArgument); + } + + if (modifiedChild == null) { + modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument()); + } + + return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild); + } + + + private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener { + + private final YangInstanceIdentifier listenerPath; + private final DOMDataTreeChangeListener delegate; + + private final Map> registrations = + new ConcurrentHashMap<>(); + + DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath, + final DOMDataTreeChangeListener delegate) { + this.listenerPath = Preconditions.checkNotNull(listenerPath); + this.delegate = Preconditions.checkNotNull(delegate); + } + + @Override + public void onDataTreeChanged(@Nonnull final Collection changes) { + LOG.debug("Received data changed {}", changes); + applyChanges(listenerPath, changes); + delegate.onDataTreeChanged(changes); + } + + synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot, + final Collection changes) { + final YangInstanceIdentifier changeId = + YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot)); + + final List newCandidates = changes.stream() + .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode())) + .collect(Collectors.toList()); + delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates))); + } + + void addSubshard(final ChildShardContext context) { + Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher, + "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable"); + + final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard(); + // since this is going into subshard we want to listen for ALL changes in the subshard + registrations.put(context.getPrefix().getRootIdentifier(), + listenableShard.registerTreeChangeListener( + context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged( + context.getPrefix().getRootIdentifier(), changes))); + } + + void close() { + for (final ListenerRegistration registration : registrations.values()) { + registration.close(); + } + registrations.clear(); + } + } + + private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode { + + private final PathArgument identifier; + + EmptyDataTreeCandidateNode(final PathArgument identifier) { + this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null"); + } + + @Nonnull + @Override + public PathArgument getIdentifier() { + return identifier; + } + + @Nonnull + @Override + public Collection getChildNodes() { + return Collections.emptySet(); + } + + @Nullable + @Override + public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) { + return null; + } + + @Nonnull + @Override + public ModificationType getModificationType() { + return ModificationType.UNMODIFIED; + } + + @Nonnull + @Override + public Optional> getDataAfter() { + return Optional.absent(); + } + + @Nonnull + @Override + public Optional> getDataBefore() { + return Optional.absent(); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java index 7664897479..53411a94dc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java @@ -29,7 +29,6 @@ import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification; import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,14 +45,17 @@ class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard { private final Map childShards = new HashMap<>(); @GuardedBy("this") private final List producers = new ArrayList<>(); - private final DistributedDataStore distributedDataStore; + + private final DistributedShardChangePublisher publisher; DistributedShardFrontend(final DistributedDataStore distributedDataStore, final DataStoreClient client, final DOMDataTreeIdentifier shardRoot) { - this.distributedDataStore = Preconditions.checkNotNull(distributedDataStore); this.client = Preconditions.checkNotNull(client); this.shardRoot = Preconditions.checkNotNull(shardRoot); + + publisher = new DistributedShardChangePublisher(client, Preconditions.checkNotNull(distributedDataStore), + shardRoot, childShards); } @Override @@ -138,41 +140,6 @@ class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard { @SuppressWarnings("unchecked") public ListenerRegistration registerTreeChangeListener( final YangInstanceIdentifier treeId, final L listener) { - - final List toStrip = new ArrayList<>(shardRoot.getRootIdentifier().getPathArguments()); - final List stripFrom = new ArrayList<>(treeId.getPathArguments()); - - while (!toStrip.isEmpty()) { - stripFrom.remove(0); - toStrip.remove(0); - } - - return (ListenerRegistration) new ProxyRegistration(distributedDataStore - .registerProxyListener(treeId, YangInstanceIdentifier.create(stripFrom), listener), listener); - } - - private static class ProxyRegistration implements ListenerRegistration { - - private ListenerRegistration proxy; - private DOMDataTreeChangeListener listener; - - private ProxyRegistration( - final ListenerRegistration< - org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy, - final DOMDataTreeChangeListener listener) { - this.proxy = proxy; - this.listener = listener; - } - - @Override - public DOMDataTreeChangeListener getInstance() { - return listener; - } - - @Override - public void close() { - proxy.close(); - } + return publisher.registerTreeChangeListener(treeId, listener); } - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java index 63fd4a0867..04a75628b7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java @@ -275,8 +275,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor { private void onProducerCreated(final ProducerCreated message) { LOG.debug("Received ProducerCreated: {}", message); - // fastpath if no replication is needed, since there is only one node - if (resolver.getShardingServicePeerActorAddresses().size() == 1) { + // fastpath if we have no peers + if (resolver.getShardingServicePeerActorAddresses().isEmpty()) { getSender().tell(new Status.Success(null), noSender()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java index febc929db5..53840049b4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java @@ -29,6 +29,8 @@ import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHist import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; @@ -108,6 +110,10 @@ public class DistributedShardFrontendTest { public void testClientTransaction() throws Exception { final DistributedDataStore distributedDataStore = mock(DistributedDataStore.class); + final ActorContext context = mock(ActorContext.class); + doReturn(context).when(distributedDataStore).getActorContext(); + doReturn(SchemaContextHelper.full()).when(context).getSchemaContext(); + final DistributedShardFrontend rootShard = new DistributedShardFrontend(distributedDataStore, client, ROOT); try (final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT))) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java index 27c5c49d70..8564e2d908 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java @@ -38,10 +38,6 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; -import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; -import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; -import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; -import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -150,7 +146,6 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { } @Test - @Ignore("Needs different shard creation handling due to replicas") public void testProducerRegistrations() throws Exception { initEmptyDatastores("config"); @@ -194,54 +189,34 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { followerProducer.close(); // try to create a shard on an already registered prefix on follower try { - followerShardFactory.createDistributedShard(TEST_ID, - Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)); + waitOnAsyncTask(followerShardFactory.createDistributedShard( + TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); fail("This prefix already should have a shard registration that was forwarded from the other node"); } catch (final DOMDataTreeShardingConflictException e) { - assertTrue(e.getMessage().contains("is already occupied by shard")); + assertTrue(e.getMessage().contains("is already occupied by another shard")); } } @Test - @Ignore("Needs different shard creation handling due to replicas") public void testWriteIntoMultipleShards() throws Exception { initEmptyDatastores("config"); leaderTestKit.waitForMembersUp("member-2"); - LOG.warn("registering first shard"); + LOG.debug("registering first shard"); final DistributedShardRegistration shardRegistration = waitOnAsyncTask(leaderShardFactory.createDistributedShard( TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); + leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); findLocalShard(followerDistributedDataStore.getActorContext(), ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())); - LOG.warn("Got after waiting for nonleader"); - final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager(); - - new JavaTestKit(leaderSystem) { - { - leaderShardManager.tell( - new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalShardFound.class); - - final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager(); - - followerShardManager.tell(new FindLocalShard( - ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef()); - followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class); - LOG.warn("Found follower shard"); - - leaderDistributedDataStore.getActorContext().getShardManager().tell( - new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef()); - expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class); - } - }; - + LOG.debug("Got after waiting for nonleader"); final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID)); final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true); @@ -256,7 +231,7 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest { cursor.close(); LOG.warn("Got to pre submit"); - tx.submit(); + tx.submit().checkedGet(); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java index 9841ca0125..91435bed5c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java @@ -36,6 +36,8 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletionStage; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -46,6 +48,7 @@ import org.mockito.Captor; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.ActorSystemProvider; +import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.AbstractTest; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; @@ -94,6 +97,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.create(getOuterListIdFor(0).getPathArguments()) .node(TestModel.INNER_LIST_QNAME)); + private static final Set SINGLE_MEMBER = Collections.singleton(AbstractTest.MEMBER_NAME); private ActorSystem leaderSystem; @@ -307,18 +311,87 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { } - private static Collection createInnerListMapEntries(final int amount, final String valuePrefix) { - final Collection ret = new ArrayList<>(); - for (int i = 0; i < amount; i++) { - ret.add(ImmutableNodes.mapEntryBuilder() - .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME, - QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i))) - .withChild(ImmutableNodes - .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i)) - .build()); + // top level shard at TEST element, with subshards on each outer-list map entry + @Test + public void testMultipleShardLevels() throws Exception { + initEmptyDatastore("config"); + + final DistributedShardRegistration testShardId = waitOnAsyncTask( + leaderShardFactory.createDistributedShard(TEST_ID, SINGLE_MEMBER), + DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION); + + final ArrayList registrations = new ArrayList<>(); + final int listSize = 5; + for (int i = 0; i < listSize; i++) { + final YangInstanceIdentifier entryYID = getOuterListIdFor(i); + final CompletionStage future = leaderShardFactory.createDistributedShard( + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, entryYID), SINGLE_MEMBER); + + registrations.add(waitOnAsyncTask(future, DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION)); } - return ret; + final DOMDataTreeIdentifier rootId = + new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY); + final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singletonList( + rootId)); + + DOMDataTreeCursorAwareTransaction transaction = producer.createTransaction(false); + + DOMDataTreeWriteCursor cursor = transaction.createCursor(rootId); + assertNotNull(cursor); + + final MapNode outerList = + ImmutableMapNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build(); + + final ContainerNode testNode = + ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(outerList) + .build(); + + cursor.write(testNode.getIdentifier(), testNode); + + cursor.close(); + transaction.submit().checkedGet(); + + final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class); + doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap()); + + final MapNode wholeList = ImmutableMapNodeBuilder.create(outerList) + .withValue(createOuterEntries(listSize, "testing-values")).build(); + + transaction = producer.createTransaction(false); + cursor = transaction.createCursor(TEST_ID); + assertNotNull(cursor); + + cursor.write(wholeList.getIdentifier(), wholeList); + cursor.close(); + + transaction.submit().checkedGet(); + + leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID), + true, Collections.emptyList()); + + // need 6 invocations, first initial thats from the parent shard, and then each individual subshard + verify(mockedDataTreeListener, timeout(10000).times(6)).onDataTreeChanged(captorForChanges.capture(), + captorForSubtrees.capture()); + verifyNoMoreInteractions(mockedDataTreeListener); + final List>> allSubtrees = captorForSubtrees.getAllValues(); + + final Map> lastSubtree = allSubtrees.get(allSubtrees.size() - 1); + + final NormalizedNode actual = lastSubtree.get(TEST_ID); + assertNotNull(actual); + + final NormalizedNode expected = + ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)) + .withChild(ImmutableMapNodeBuilder.create(outerList) + .withValue(createOuterEntries(listSize, "testing-values")).build()) + .build(); + + assertEquals(expected, actual); } @Test @@ -381,6 +454,40 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest { } } + private static Collection createOuterEntries(final int amount, final String valuePrefix) { + final Collection ret = new ArrayList<>(); + for (int i = 0; i < amount; i++) { + ret.add(ImmutableNodes.mapEntryBuilder() + .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, + QName.create(TestModel.OUTER_LIST_QNAME, "id"), i)) + .withChild(ImmutableNodes + .leafNode(QName.create(TestModel.OUTER_LIST_QNAME, "id"), i)) + .withChild(createWholeInnerList(amount, "outer id: " + i + " " + valuePrefix)) + .build()); + } + + return ret; + } + + private static MapNode createWholeInnerList(final int amount, final String valuePrefix) { + return ImmutableMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME)) + .withValue(createInnerListMapEntries(amount, valuePrefix)).build(); + } + + private static Collection createInnerListMapEntries(final int amount, final String valuePrefix) { + final Collection ret = new ArrayList<>(); + for (int i = 0; i < amount; i++) { + ret.add(ImmutableNodes.mapEntryBuilder() + .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME, + QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i))) + .withChild(ImmutableNodes + .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i)) + .build()); + } + + return ret; + } + private static YangInstanceIdentifier getOuterListIdFor(final int id) { return TestModel.OUTER_LIST_PATH.node(new NodeIdentifierWithPredicates( TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), id));