BUG-2138: DistributedShardListeners support for nested shards 89/49189/39
authorTomas Cere <tcere@cisco.com>
Thu, 8 Dec 2016 10:54:07 +0000 (11:54 +0100)
committerJakub Morvay <jmorvay@cisco.com>
Fri, 17 Mar 2017 17:01:28 +0000 (18:01 +0100)
Adds support for listeners in shards that have a subshard/s,
which re-asseble notifications received from subshards.

Change-Id: Icc7dfb971731d78c306a87335e54668f3bbc133e
Signed-off-by: Tomas Cere <tcere@cisco.com>
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontendTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeRemotingTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTreeTest.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java
new file mode 100644 (file)
index 0000000..5e49c00
--- /dev/null
@@ -0,0 +1,346 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
+import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
+import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
+import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedShardChangePublisher
+        extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
+        implements DOMStoreTreeChangePublisher {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
+
+    private final DistributedDataStore distributedDataStore;
+    private final YangInstanceIdentifier shardPath;
+
+    // This will be useful for signaling back pressure
+    private final DataStoreClient client;
+
+    private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
+
+    @GuardedBy("this")
+    private final DataTree dataTree;
+
+    public DistributedShardChangePublisher(final DataStoreClient client,
+                                           final DistributedDataStore distributedDataStore,
+                                           final DOMDataTreeIdentifier prefix,
+                                           final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
+        this.client = client;
+        this.distributedDataStore = distributedDataStore;
+        // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
+        // maybe the whole listener logic would be better in the backend shards where we have direct access to the
+        // dataTree and wont have to cache it redundantly.
+        this.dataTree = InMemoryDataTreeFactory.getInstance().create(
+                TreeType.valueOf(prefix.getDatastoreType().name()), prefix.getRootIdentifier());
+
+        dataTree.setSchemaContext(distributedDataStore.getActorContext().getSchemaContext());
+
+        this.shardPath = prefix.getRootIdentifier();
+        this.childShards = childShards;
+    }
+
+    protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
+        LOG.debug("Closing registration {}", registration);
+    }
+
+    @Override
+    public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
+            registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
+        takeLock();
+        try {
+            return setupListenerContext(path, listener);
+        } finally {
+            releaseLock();
+        }
+    }
+
+    private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
+            setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
+        // we need to register the listener registration path based on the shards root
+        // we have to strip the shard path from the listener path and then register
+        YangInstanceIdentifier strippedIdentifier = listenerPath;
+        if (!shardPath.isEmpty()) {
+            strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
+        }
+
+        final DOMDataTreeListenerWithSubshards subshardListener =
+                new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
+        final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
+                setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
+
+        for (final ChildShardContext maybeAffected : childShards.values()) {
+            if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
+                // consumer has initialDataChangeEvent subshard somewhere on lower level
+                // register to the notification manager with snapshot and forward child notifications to parent
+                LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
+                subshardListener.addSubshard(maybeAffected);
+            } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
+                // bind path is inside subshard
+                // TODO can this happen? seems like in ShardedDOMDataTree we are
+                // already registering to the lowest shard possible
+                throw new UnsupportedOperationException("Listener should be registered directly "
+                        + "into initialDataChangeEvent subshard");
+            }
+        }
+
+        return reg;
+    }
+
+    private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
+            setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
+                                         final YangInstanceIdentifier listenerPath,
+                                         final DOMDataTreeListenerWithSubshards listener) {
+
+        LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
+
+        // register in the shard tree
+        final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
+                findNodeFor(listenerPath.getPathArguments());
+
+        // register listener in CDS
+        final ProxyRegistration proxyReg = new ProxyRegistration(distributedDataStore
+                .registerProxyListener(shardLookup, listenerPath, listener), listener);
+
+        @SuppressWarnings("unchecked")
+        final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
+            new AbstractDOMDataTreeChangeListenerRegistration<L>((L) listener) {
+                @Override
+                protected void removeRegistration() {
+                    listener.close();
+                    DistributedShardChangePublisher.this.removeRegistration(node, this);
+                    registrationRemoved(this);
+                    proxyReg.close();
+                }
+            };
+        addRegistration(node, registration);
+
+        return registration;
+    }
+
+    private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
+                                                         final YangInstanceIdentifier listenerPath) {
+        if (shardPath.isEmpty()) {
+            return listenerPath.getPathArguments();
+        }
+
+        final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
+        final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
+        final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
+
+        while (shardIter.hasNext()) {
+            if (shardIter.next().equals(listenerIter.next())) {
+                listenerIter.remove();
+            } else {
+                break;
+            }
+        }
+
+        return listenerPathArgs;
+    }
+
+    private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
+
+        private final ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
+        private final DOMDataTreeChangeListener listener;
+
+        private ProxyRegistration(
+                final ListenerRegistration<
+                        org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
+                final DOMDataTreeChangeListener listener) {
+            this.proxy = proxy;
+            this.listener = listener;
+        }
+
+        @Override
+        public DOMDataTreeChangeListener getInstance() {
+            return listener;
+        }
+
+        @Override
+        public void close() {
+            proxy.close();
+        }
+    }
+
+    synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
+            final Collection<DataTreeCandidate> changes) {
+        final DataTreeModification modification = dataTree.takeSnapshot().newModification();
+        for (final DataTreeCandidate change : changes) {
+            try {
+                DataTreeCandidates.applyToModification(modification, change);
+            } catch (SchemaValidationFailedException e) {
+                LOG.error("Validation failed {}", e);
+            }
+        }
+
+        modification.ready();
+
+        final DataTreeCandidate candidate;
+
+        try {
+            dataTree.validate(modification);
+        } catch (final DataValidationFailedException e) {
+            LOG.error("Validation failed for built modification, modification {}, current data tree: {}",
+                    modification, dataTree, e);
+            throw new RuntimeException("Notification validation failed", e);
+        }
+
+        // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
+        candidate = dataTree.prepare(modification);
+        dataTree.commit(candidate);
+
+
+        DataTreeCandidateNode modifiedChild = candidate.getRootNode();
+
+        for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
+            modifiedChild = modifiedChild.getModifiedChild(pathArgument);
+        }
+
+        if (modifiedChild == null) {
+            modifiedChild = new EmptyDataTreeCandidateNode(dataTree.getRootPath().getLastPathArgument());
+        }
+
+        return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
+    }
+
+
+    private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
+
+        private final YangInstanceIdentifier listenerPath;
+        private final DOMDataTreeChangeListener delegate;
+
+        private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
+                new ConcurrentHashMap<>();
+
+        DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
+                                         final DOMDataTreeChangeListener delegate) {
+            this.listenerPath = Preconditions.checkNotNull(listenerPath);
+            this.delegate = Preconditions.checkNotNull(delegate);
+        }
+
+        @Override
+        public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+            LOG.debug("Received data changed {}", changes);
+            applyChanges(listenerPath, changes);
+            delegate.onDataTreeChanged(changes);
+        }
+
+        synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
+                                            final Collection<DataTreeCandidate> changes) {
+            final YangInstanceIdentifier changeId =
+                    YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
+
+            final List<DataTreeCandidate> newCandidates = changes.stream()
+                    .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
+                    .collect(Collectors.toList());
+            delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
+        }
+
+        void addSubshard(final ChildShardContext context) {
+            Preconditions.checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
+                    "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
+
+            final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
+            // since this is going into subshard we want to listen for ALL changes in the subshard
+            registrations.put(context.getPrefix().getRootIdentifier(),
+                    listenableShard.registerTreeChangeListener(
+                            context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
+                                    context.getPrefix().getRootIdentifier(), changes)));
+        }
+
+        void close() {
+            for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
+                registration.close();
+            }
+            registrations.clear();
+        }
+    }
+
+    private static final class EmptyDataTreeCandidateNode implements DataTreeCandidateNode {
+
+        private final PathArgument identifier;
+
+        EmptyDataTreeCandidateNode(final PathArgument identifier) {
+            this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
+        }
+
+        @Nonnull
+        @Override
+        public PathArgument getIdentifier() {
+            return identifier;
+        }
+
+        @Nonnull
+        @Override
+        public Collection<DataTreeCandidateNode> getChildNodes() {
+            return Collections.emptySet();
+        }
+
+        @Nullable
+        @Override
+        public DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
+            return null;
+        }
+
+        @Nonnull
+        @Override
+        public ModificationType getModificationType() {
+            return ModificationType.UNMODIFIED;
+        }
+
+        @Nonnull
+        @Override
+        public Optional<NormalizedNode<?, ?>> getDataAfter() {
+            return Optional.absent();
+        }
+
+        @Nonnull
+        @Override
+        public Optional<NormalizedNode<?, ?>> getDataBefore() {
+            return Optional.absent();
+        }
+    }
+}
index 76648974799aee9d1ebfb80476a98586ef070454..53411a94dc0f75b767b6f5009623cc8b20f88be7 100644 (file)
@@ -29,7 +29,6 @@ import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification;
 import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,14 +45,17 @@ class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard {
     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
     @GuardedBy("this")
     private final List<ShardProxyProducer> producers = new ArrayList<>();
-    private final DistributedDataStore distributedDataStore;
+
+    private final DistributedShardChangePublisher publisher;
 
     DistributedShardFrontend(final DistributedDataStore distributedDataStore,
                              final DataStoreClient client,
                              final DOMDataTreeIdentifier shardRoot) {
-        this.distributedDataStore = Preconditions.checkNotNull(distributedDataStore);
         this.client = Preconditions.checkNotNull(client);
         this.shardRoot = Preconditions.checkNotNull(shardRoot);
+
+        publisher = new DistributedShardChangePublisher(client, Preconditions.checkNotNull(distributedDataStore),
+                shardRoot, childShards);
     }
 
     @Override
@@ -138,41 +140,6 @@ class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard {
     @SuppressWarnings("unchecked")
     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
             final YangInstanceIdentifier treeId, final L listener) {
-
-        final List<PathArgument> toStrip = new ArrayList<>(shardRoot.getRootIdentifier().getPathArguments());
-        final List<PathArgument> stripFrom = new ArrayList<>(treeId.getPathArguments());
-
-        while (!toStrip.isEmpty()) {
-            stripFrom.remove(0);
-            toStrip.remove(0);
-        }
-
-        return (ListenerRegistration<L>) new ProxyRegistration(distributedDataStore
-                .registerProxyListener(treeId, YangInstanceIdentifier.create(stripFrom), listener), listener);
-    }
-
-    private static class ProxyRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
-
-        private ListenerRegistration<org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy;
-        private DOMDataTreeChangeListener listener;
-
-        private ProxyRegistration(
-                final ListenerRegistration<
-                        org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> proxy,
-                final DOMDataTreeChangeListener listener) {
-            this.proxy = proxy;
-            this.listener = listener;
-        }
-
-        @Override
-        public DOMDataTreeChangeListener getInstance() {
-            return listener;
-        }
-
-        @Override
-        public void close() {
-            proxy.close();
-        }
+        return publisher.registerTreeChangeListener(treeId, listener);
     }
-
 }
index 63fd4a0867ddb340fd55658e01c3e29e515fd163..04a75628b7d8d6583b661b27ce71d9f6a7e3ccac 100644 (file)
@@ -275,8 +275,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     private void onProducerCreated(final ProducerCreated message) {
         LOG.debug("Received ProducerCreated: {}", message);
 
-        // fastpath if no replication is needed, since there is only one node
-        if (resolver.getShardingServicePeerActorAddresses().size() == 1) {
+        // fastpath if we have no peers
+        if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
             getSender().tell(new Status.Success(null), noSender());
         }
 
index febc929db5d4ec17768e8253ece1a8f7fd84b5ba..53840049b4f9b283aabb1060069b935295208e3a 100644 (file)
@@ -29,6 +29,8 @@ import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHist
 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
@@ -108,6 +110,10 @@ public class DistributedShardFrontendTest {
     public void testClientTransaction() throws Exception {
 
         final DistributedDataStore distributedDataStore = mock(DistributedDataStore.class);
+        final ActorContext context = mock(ActorContext.class);
+        doReturn(context).when(distributedDataStore).getActorContext();
+        doReturn(SchemaContextHelper.full()).when(context).getSchemaContext();
+
         final DistributedShardFrontend rootShard = new DistributedShardFrontend(distributedDataStore, client, ROOT);
 
         try (final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT))) {
index 27c5c49d70ec114bbb5b18414bb85728aa7a885f..8564e2d90859098b87a2a1ad4f2fe5880da2f150 100644 (file)
@@ -38,10 +38,6 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
-import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
 import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -150,7 +146,6 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
     }
 
     @Test
-    @Ignore("Needs different shard creation handling due to replicas")
     public void testProducerRegistrations() throws Exception {
         initEmptyDatastores("config");
 
@@ -194,54 +189,34 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         followerProducer.close();
         // try to create a shard on an already registered prefix on follower
         try {
-            followerShardFactory.createDistributedShard(TEST_ID,
-                    Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+            waitOnAsyncTask(followerShardFactory.createDistributedShard(
+                    TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+                    DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
             fail("This prefix already should have a shard registration that was forwarded from the other node");
         } catch (final DOMDataTreeShardingConflictException e) {
-            assertTrue(e.getMessage().contains("is already occupied by shard"));
+            assertTrue(e.getMessage().contains("is already occupied by another shard"));
         }
     }
 
     @Test
-    @Ignore("Needs different shard creation handling due to replicas")
     public void testWriteIntoMultipleShards() throws Exception {
         initEmptyDatastores("config");
 
         leaderTestKit.waitForMembersUp("member-2");
 
-        LOG.warn("registering first shard");
+        LOG.debug("registering first shard");
         final DistributedShardRegistration shardRegistration =
                 waitOnAsyncTask(leaderShardFactory.createDistributedShard(
                         TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
                         DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
 
+
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
         findLocalShard(followerDistributedDataStore.getActorContext(),
                 ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
 
-        LOG.warn("Got after waiting for nonleader");
-        final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
-
-        new JavaTestKit(leaderSystem) {
-            {
-                leaderShardManager.tell(
-                        new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
-                final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
-
-                followerShardManager.tell(new FindLocalShard(
-                        ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
-                followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-                LOG.warn("Found follower shard");
-
-                leaderDistributedDataStore.getActorContext().getShardManager().tell(
-                        new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
-                expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
-            }
-        };
-
+        LOG.debug("Got after waiting for nonleader");
         final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
 
         final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
@@ -256,7 +231,7 @@ public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
         cursor.close();
         LOG.warn("Got to pre submit");
 
-        tx.submit();
+        tx.submit().checkedGet();
     }
 
     @Test
index 9841ca01250f27ef1ca0ede6fa5a28e5dd12e351..91435bed5c1efd8076ee52b59ff91ffc3f506049 100644 (file)
@@ -36,6 +36,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -46,6 +48,7 @@ import org.mockito.Captor;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.datastore.AbstractTest;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
@@ -94,6 +97,7 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
             new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
                     YangInstanceIdentifier.create(getOuterListIdFor(0).getPathArguments())
                             .node(TestModel.INNER_LIST_QNAME));
+    private static final Set<MemberName> SINGLE_MEMBER = Collections.singleton(AbstractTest.MEMBER_NAME);
 
     private ActorSystem leaderSystem;
 
@@ -307,18 +311,87 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
 
     }
 
-    private static Collection<MapEntryNode> createInnerListMapEntries(final int amount, final String valuePrefix) {
-        final Collection<MapEntryNode> ret = new ArrayList<>();
-        for (int i = 0; i < amount; i++) {
-            ret.add(ImmutableNodes.mapEntryBuilder()
-                    .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME,
-                            QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i)))
-                    .withChild(ImmutableNodes
-                            .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i))
-                    .build());
+    // top level shard at TEST element, with subshards on each outer-list map entry
+    @Test
+    public void testMultipleShardLevels() throws Exception {
+        initEmptyDatastore("config");
+
+        final DistributedShardRegistration testShardId = waitOnAsyncTask(
+                leaderShardFactory.createDistributedShard(TEST_ID, SINGLE_MEMBER),
+                DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
+        final ArrayList<DistributedShardRegistration> registrations = new ArrayList<>();
+        final int listSize = 5;
+        for (int i = 0; i < listSize; i++) {
+            final YangInstanceIdentifier entryYID = getOuterListIdFor(i);
+            final CompletionStage<DistributedShardRegistration> future = leaderShardFactory.createDistributedShard(
+                    new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, entryYID), SINGLE_MEMBER);
+
+            registrations.add(waitOnAsyncTask(future, DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION));
         }
 
-        return ret;
+        final DOMDataTreeIdentifier rootId =
+                new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
+        final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singletonList(
+                rootId));
+
+        DOMDataTreeCursorAwareTransaction transaction = producer.createTransaction(false);
+
+        DOMDataTreeWriteCursor cursor = transaction.createCursor(rootId);
+        assertNotNull(cursor);
+
+        final MapNode outerList =
+                ImmutableMapNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+        final ContainerNode testNode =
+                ImmutableContainerNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
+                        .withChild(outerList)
+                        .build();
+
+        cursor.write(testNode.getIdentifier(), testNode);
+
+        cursor.close();
+        transaction.submit().checkedGet();
+
+        final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class);
+        doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap());
+
+        final MapNode wholeList = ImmutableMapNodeBuilder.create(outerList)
+                .withValue(createOuterEntries(listSize, "testing-values")).build();
+
+        transaction = producer.createTransaction(false);
+        cursor = transaction.createCursor(TEST_ID);
+        assertNotNull(cursor);
+
+        cursor.write(wholeList.getIdentifier(), wholeList);
+        cursor.close();
+
+        transaction.submit().checkedGet();
+
+        leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID),
+                true, Collections.emptyList());
+
+        // need 6 invocations, first initial thats from the parent shard, and then each individual subshard
+        verify(mockedDataTreeListener, timeout(10000).times(6)).onDataTreeChanged(captorForChanges.capture(),
+                captorForSubtrees.capture());
+        verifyNoMoreInteractions(mockedDataTreeListener);
+        final List<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> allSubtrees = captorForSubtrees.getAllValues();
+
+        final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> lastSubtree = allSubtrees.get(allSubtrees.size() - 1);
+
+        final NormalizedNode<?, ?> actual = lastSubtree.get(TEST_ID);
+        assertNotNull(actual);
+
+        final NormalizedNode<?, ?> expected =
+                ImmutableContainerNodeBuilder.create()
+                        .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
+                        .withChild(ImmutableMapNodeBuilder.create(outerList)
+                                .withValue(createOuterEntries(listSize, "testing-values")).build())
+                        .build();
+
+        assertEquals(expected, actual);
     }
 
     @Test
@@ -381,6 +454,40 @@ public class DistributedShardedDOMDataTreeTest extends AbstractTest {
         }
     }
 
+    private static Collection<MapEntryNode> createOuterEntries(final int amount, final String valuePrefix) {
+        final Collection<MapEntryNode> ret = new ArrayList<>();
+        for (int i = 0; i < amount; i++) {
+            ret.add(ImmutableNodes.mapEntryBuilder()
+                    .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME,
+                            QName.create(TestModel.OUTER_LIST_QNAME, "id"), i))
+                    .withChild(ImmutableNodes
+                            .leafNode(QName.create(TestModel.OUTER_LIST_QNAME, "id"), i))
+                    .withChild(createWholeInnerList(amount, "outer id: " + i + " " + valuePrefix))
+                    .build());
+        }
+
+        return ret;
+    }
+
+    private static MapNode createWholeInnerList(final int amount, final String valuePrefix) {
+        return ImmutableMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME))
+                .withValue(createInnerListMapEntries(amount, valuePrefix)).build();
+    }
+
+    private static Collection<MapEntryNode> createInnerListMapEntries(final int amount, final String valuePrefix) {
+        final Collection<MapEntryNode> ret = new ArrayList<>();
+        for (int i = 0; i < amount; i++) {
+            ret.add(ImmutableNodes.mapEntryBuilder()
+                    .withNodeIdentifier(new NodeIdentifierWithPredicates(TestModel.INNER_LIST_QNAME,
+                            QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i)))
+                    .withChild(ImmutableNodes
+                            .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i))
+                    .build());
+        }
+
+        return ret;
+    }
+
     private static YangInstanceIdentifier getOuterListIdFor(final int id) {
         return TestModel.OUTER_LIST_PATH.node(new NodeIdentifierWithPredicates(
                 TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), id));