Bug 509: Added In-memory datastore support for wildcarded change listeners 43/6143/6
authorTony Tkacik <ttkacik@cisco.com>
Thu, 10 Apr 2014 13:39:31 +0000 (15:39 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Mon, 14 Apr 2014 20:21:38 +0000 (20:21 +0000)
Added support for wildcarded data change listeners,
which may register to updates of list entries (and their
subentries) without specifying key for list items.

Support for Wildcarded Data Change Listneners is done by
refactoring DataChangeEventResolver into Map/Reduce processing.

Map phase:
When visiting each modified node (with key) we also
looks for listener listening on same level without key.
This actually may trigger creation (mapping) of multiple events
for same listeners (eg. two flows we're added at the same time).

Reduce phase:
We walked thru all listeners to be notified and if they have
multiple events assigned, we merge them into one even to be
delivered.

Change-Id: Ic05ddae10bd0b316009cc5bafdeca76350f00390
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/md/sal/binding/data/WildcardedDataChangeListenerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/util/compat/DataNormalizer.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMImmutableDataChangeEvent.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java [deleted file]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerTree.java

diff --git a/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/md/sal/binding/data/WildcardedDataChangeListenerTest.java b/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/md/sal/binding/data/WildcardedDataChangeListenerTest.java
new file mode 100644 (file)
index 0000000..fc303e0
--- /dev/null
@@ -0,0 +1,195 @@
+package org.opendaylight.controller.md.sal.binding.data;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.sal.binding.test.AbstractDataServiceTest;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+public class WildcardedDataChangeListenerTest extends AbstractDataServiceTest {
+
+    private static final NodeKey NODE_0_KEY = new NodeKey(new NodeId("test:0"));
+    private static final NodeKey NODE_1_KEY = new NodeKey(new NodeId("test:1"));
+
+    public static final InstanceIdentifier<Flow> DEEP_WILDCARDED_PATH = InstanceIdentifier.builder(Nodes.class)
+            .child(Node.class) //
+            .augmentation(FlowCapableNode.class) //
+            .child(Table.class) //
+            .child(Flow.class) //
+            .build();
+
+    private static final TableKey TABLE_0_KEY = new TableKey((short) 0);
+    private static final TableFeaturesKey TABLE_FEATURES_KEY = new TableFeaturesKey((short) 0);
+
+    private static final InstanceIdentifier<Table> NODE_0_TABLE_PATH = InstanceIdentifier.builder(Nodes.class)
+            .child(Node.class, NODE_0_KEY) //
+            .augmentation(FlowCapableNode.class) //
+            .child(Table.class, TABLE_0_KEY) //
+            .build();
+
+    private static final InstanceIdentifier<Table> NODE_1_TABLE_PATH = InstanceIdentifier.builder(Nodes.class)
+            .child(Node.class, NODE_1_KEY) //
+            .augmentation(FlowCapableNode.class) //
+            .child(Table.class, TABLE_0_KEY) //
+            .build();
+
+    private static final FlowKey FLOW_KEY = new FlowKey(new FlowId("test"));
+
+    private static final InstanceIdentifier<Flow> NODE_0_FLOW_PATH = InstanceIdentifier.builder(NODE_0_TABLE_PATH)
+            .child(Flow.class, FLOW_KEY).build();
+
+    private static final InstanceIdentifier<Flow> NODE_1_FLOW_PATH = InstanceIdentifier.builder(NODE_1_TABLE_PATH)
+            .child(Flow.class, FLOW_KEY).build();
+
+    private static final InstanceIdentifier<TableFeatures> NODE_0_TABLE_FEATURES_PATH = InstanceIdentifier
+            .builder(NODE_0_TABLE_PATH).child(TableFeatures.class, TABLE_FEATURES_KEY).build();
+
+    private static final TableFeatures TABLE_FEATURES = new TableFeaturesBuilder()//
+            .setKey(TABLE_FEATURES_KEY) //
+            .setName("Foo") //
+            .setMaxEntries(1000L) //
+            .build();
+
+    private static final Flow FLOW = new FlowBuilder() //
+            .setKey(FLOW_KEY) //
+            .setBarrier(true) //
+            .setStrict(true) //
+            .build();
+
+    @Test
+    public void testSepareteWrites() throws InterruptedException, TimeoutException, ExecutionException {
+
+        DataProviderService dataBroker = testContext.getBindingDataBroker();
+
+        final SettableFuture<DataChangeEvent<InstanceIdentifier<?>, DataObject>> eventFuture = SettableFuture.create();
+        dataBroker.registerDataChangeListener(DEEP_WILDCARDED_PATH, new DataChangeListener() {
+
+            @Override
+            public void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> dataChangeEvent) {
+                eventFuture.set(dataChangeEvent);
+            }
+        });
+
+        DataModificationTransaction transaction = dataBroker.beginTransaction();
+        transaction.putOperationalData(NODE_0_TABLE_FEATURES_PATH, TABLE_FEATURES);
+        transaction.putOperationalData(NODE_0_FLOW_PATH, FLOW);
+        transaction.putOperationalData(NODE_1_FLOW_PATH, FLOW);
+        transaction.commit().get();
+
+        DataChangeEvent<InstanceIdentifier<?>, DataObject> event = eventFuture.get(1000, TimeUnit.MILLISECONDS);
+
+        validateEvent(event);
+    }
+
+    @Test
+    public void testWriteByReplace() throws InterruptedException, TimeoutException, ExecutionException {
+
+        DataProviderService dataBroker = testContext.getBindingDataBroker();
+
+        final SettableFuture<DataChangeEvent<InstanceIdentifier<?>, DataObject>> eventFuture = SettableFuture.create();
+        dataBroker.registerDataChangeListener(DEEP_WILDCARDED_PATH, new DataChangeListener() {
+
+            @Override
+            public void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> dataChangeEvent) {
+                eventFuture.set(dataChangeEvent);
+            }
+        });
+
+        DataModificationTransaction tableTx = dataBroker.beginTransaction();
+        tableTx.putOperationalData(NODE_0_TABLE_FEATURES_PATH, TABLE_FEATURES);
+        tableTx.commit().get();
+
+        assertFalse(eventFuture.isDone());
+
+        DataModificationTransaction flowTx = dataBroker.beginTransaction();
+
+        Table table = new TableBuilder() //
+                .setKey(TABLE_0_KEY) //
+                .setFlow(Collections.singletonList(FLOW)) //
+                .build();
+
+        flowTx.putOperationalData(NODE_0_TABLE_PATH, table);
+        flowTx.putOperationalData(NODE_1_FLOW_PATH, FLOW);
+        flowTx.commit().get();
+
+        validateEvent(eventFuture.get(1000, TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testNoChangeOnReplaceWithSameValue() throws InterruptedException, TimeoutException, ExecutionException {
+
+        DataProviderService dataBroker = testContext.getBindingDataBroker();
+
+        // We wrote initial state NODE_0_FLOW
+        DataModificationTransaction transaction = dataBroker.beginTransaction();
+        transaction.putOperationalData(NODE_0_FLOW_PATH, FLOW);
+        transaction.commit().get();
+
+        // We registered DataChangeListener
+        final SettableFuture<DataChangeEvent<InstanceIdentifier<?>, DataObject>> eventFuture = SettableFuture.create();
+        dataBroker.registerDataChangeListener(DEEP_WILDCARDED_PATH, new DataChangeListener() {
+
+            @Override
+            public void onDataChanged(final DataChangeEvent<InstanceIdentifier<?>, DataObject> dataChangeEvent) {
+                eventFuture.set(dataChangeEvent);
+            }
+        });
+        assertFalse(eventFuture.isDone());
+
+        DataModificationTransaction secondTx = dataBroker.beginTransaction();
+        secondTx.putOperationalData(NODE_0_FLOW_PATH, FLOW);
+        secondTx.putOperationalData(NODE_1_FLOW_PATH, FLOW);
+        secondTx.commit().get();
+
+        DataChangeEvent<InstanceIdentifier<?>, DataObject> event = (eventFuture.get(1000, TimeUnit.MILLISECONDS));
+        assertNotNull(event);
+        // Data change should contains NODE_1 Flow - which was added
+        assertTrue(event.getCreatedOperationalData().containsKey(NODE_1_FLOW_PATH));
+        // Data change must not containe NODE_0 Flow which was replaced with same value.
+        assertFalse(event.getUpdatedOperationalData().containsKey(NODE_0_FLOW_PATH));
+    }
+
+    private static void validateEvent(final DataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
+        assertNotNull(event);
+        assertTrue(event.getCreatedOperationalData().containsKey(NODE_1_FLOW_PATH));
+        assertTrue(event.getCreatedOperationalData().containsKey(NODE_0_FLOW_PATH));
+        assertFalse(event.getCreatedOperationalData().containsKey(NODE_0_TABLE_FEATURES_PATH));
+    }
+
+}
index e1fc3c3..e52e196 100644 (file)
@@ -20,7 +20,6 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.AugmentationIdentifier;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.Node;
 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
@@ -54,17 +53,17 @@ public class DataNormalizer {
         Iterator<PathArgument> arguments = legacy.getPath().iterator();
 
         try {
-            while ( arguments.hasNext() ) {
+            while (arguments.hasNext()) {
                 PathArgument legacyArg = arguments.next();
                 currentOp = currentOp.getChild(legacyArg);
-                checkArgument(currentOp != null, "Legacy Instance Identifier %s is not correct. Normalized Instance Identifier so far %s",legacy,normalizedArgs.build());
+                checkArgument(currentOp != null,
+                        "Legacy Instance Identifier %s is not correct. Normalized Instance Identifier so far %s",
+                        legacy, normalizedArgs.build());
                 while (currentOp.isMixin()) {
                     normalizedArgs.add(currentOp.getIdentifier());
                     currentOp = currentOp.getChild(legacyArg.getNodeType());
                 }
-                if(arguments.hasNext() || (!currentOp.isKeyedEntry() || legacyArg instanceof NodeIdentifierWithPredicates || legacyArg instanceof NodeWithValue)) {
-                    normalizedArgs.add(legacyArg);
-                }
+                normalizedArgs.add(legacyArg);
             }
         } catch (DataNormalizationException e) {
             throw new IllegalArgumentException(String.format("Failed to normalize path %s", legacy), e);
@@ -73,11 +72,13 @@ public class DataNormalizer {
         return new InstanceIdentifier(normalizedArgs.build());
     }
 
-    public Map.Entry<InstanceIdentifier,NormalizedNode<?, ?>> toNormalized(final Map.Entry<InstanceIdentifier,CompositeNode> legacy) {
+    public Map.Entry<InstanceIdentifier, NormalizedNode<?, ?>> toNormalized(
+            final Map.Entry<InstanceIdentifier, CompositeNode> legacy) {
         return toNormalized(legacy.getKey(), legacy.getValue());
     }
 
-    public Map.Entry<InstanceIdentifier,NormalizedNode<?, ?>> toNormalized(final InstanceIdentifier legacyPath, final CompositeNode legacyData) {
+    public Map.Entry<InstanceIdentifier, NormalizedNode<?, ?>> toNormalized(final InstanceIdentifier legacyPath,
+            final CompositeNode legacyData) {
 
         InstanceIdentifier normalizedPath = toNormalized(legacyPath);
 
@@ -86,7 +87,8 @@ public class DataNormalizer {
             try {
                 currentOp = currentOp.getChild(arg);
             } catch (DataNormalizationException e) {
-                throw new IllegalArgumentException(String.format("Failed to validate normalized path %s", normalizedPath), e);
+                throw new IllegalArgumentException(String.format("Failed to validate normalized path %s",
+                        normalizedPath), e);
             }
         }
 
@@ -101,7 +103,7 @@ public class DataNormalizer {
                 throw new IllegalArgumentException(String.format("Failed to get child operation for %s", legacyData), e);
             }
 
-            if(potentialOp.getIdentifier() instanceof AugmentationIdentifier) {
+            if (potentialOp.getIdentifier() instanceof AugmentationIdentifier) {
                 currentOp = potentialOp;
                 ArrayList<PathArgument> reworkedArgs = new ArrayList<>(normalizedPath.getPath());
                 reworkedArgs.add(potentialOp.getIdentifier());
@@ -111,7 +113,8 @@ public class DataNormalizer {
 
         Preconditions.checkArgument(currentOp != null,
                 "Instance Identifier %s does not reference correct schema Node.", normalizedPath);
-        return new AbstractMap.SimpleEntry<InstanceIdentifier,NormalizedNode<?, ?>>(normalizedPath,currentOp.normalize(legacyData));
+        return new AbstractMap.SimpleEntry<InstanceIdentifier, NormalizedNode<?, ?>>(normalizedPath,
+                currentOp.normalize(legacyData));
     }
 
     public InstanceIdentifier toLegacy(final InstanceIdentifier normalized) {
@@ -149,9 +152,8 @@ public class DataNormalizer {
     public static Node<?> toLegacy(final NormalizedNode<?, ?> node) {
         if (node instanceof MixinNode) {
             /**
-             * Direct reading of MixinNodes is not supported,
-             * since it is not possible in legacy APIs create pointer
-             * to Mixin Nodes.
+             * Direct reading of MixinNodes is not supported, since it is not
+             * possible in legacy APIs create pointer to Mixin Nodes.
              *
              */
             return null;
@@ -193,8 +195,8 @@ public class DataNormalizer {
             final NormalizedNodeContainer<?, ?, NormalizedNode<?, ?>> mixin) {
         ArrayList<Node<?>> ret = new ArrayList<>();
         for (NormalizedNode<?, ?> child : mixin.getValue()) {
-            if(child instanceof MixinNode && child instanceof NormalizedNodeContainer<?, ?, ?>) {
-                Iterables.addAll(ret,toLegacyNodesFromMixin((NormalizedNodeContainer) child));
+            if (child instanceof MixinNode && child instanceof NormalizedNodeContainer<?, ?, ?>) {
+                Iterables.addAll(ret, toLegacyNodesFromMixin((NormalizedNodeContainer) child));
             } else {
                 ret.add(toLegacy(child));
             }
index 3c6a3d6..86f08de 100644 (file)
@@ -3,22 +3,32 @@ package org.opendaylight.controller.md.sal.dom.store.impl;
 import java.util.Map;
 import java.util.Set;
 
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 public final class DOMImmutableDataChangeEvent implements
         AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> {
 
+
+    private static final RemoveEventFactory REMOVE_EVENT_FACTORY = new RemoveEventFactory();
+    private static final CreateEventFactory CREATE_EVENT_FACTORY = new CreateEventFactory();
+
     private final NormalizedNode<?, ?> original;
     private final NormalizedNode<?, ?> updated;
     private final Map<InstanceIdentifier, ? extends NormalizedNode<?, ?>> originalData;
     private final Map<InstanceIdentifier, NormalizedNode<?, ?>> createdData;
     private final Map<InstanceIdentifier, NormalizedNode<?, ?>> updatedData;
     private final Set<InstanceIdentifier> removedPaths;
+    private final DataChangeScope scope;
+
+
 
     private DOMImmutableDataChangeEvent(final Builder change) {
         original = change.before;
@@ -27,10 +37,15 @@ public final class DOMImmutableDataChangeEvent implements
         createdData = change.created.build();
         updatedData = change.updated.build();
         removedPaths = change.removed.build();
+        scope = change.scope;
+    }
+
+    public static final Builder builder(final DataChangeScope scope) {
+        return new Builder(scope);
     }
 
-    public static final Builder builder() {
-        return new Builder();
+    protected DataChangeScope getScope() {
+        return scope;
     }
 
     @Override
@@ -69,8 +84,44 @@ public final class DOMImmutableDataChangeEvent implements
                 + ", removed=" + removedPaths + "]";
     }
 
+    /**
+     * Simple event factory which creates event based on path and data
+     *
+     *
+     */
+    public interface SimpleEventFactory {
+        DOMImmutableDataChangeEvent create(InstanceIdentifier path, NormalizedNode<PathArgument,?> data);
+    }
+
+    /**
+     * Event factory which takes after state and creates event for it.
+     *
+     * Factory for events based on path and after state.
+     * After state is set as {@link #getUpdatedSubtree()} and is path,
+     * state mapping is also present in {@link #getUpdatedData()}.
+     *
+     * @return
+     */
+    public static final SimpleEventFactory getCreateEventFactory() {
+        return CREATE_EVENT_FACTORY;
+    }
+
+    /**
+     * Event factory which takes before state and creates event for it.
+     *
+     * Factory for events based on path and after state.
+     * After state is set as {@link #getOriginalSubtree()} and is path,
+     * state mapping is also present in {@link #getOriginalSubtree()}.
+     *
+     * Path is present in {@link #getRemovedPaths()}.
+     * @return
+     */
+    public static final SimpleEventFactory getRemoveEventFactory() {
+        return REMOVE_EVENT_FACTORY;
+    }
     public static class Builder {
 
+        public DataChangeScope scope;
         private NormalizedNode<?, ?> after;
         private NormalizedNode<?, ?> before;
 
@@ -79,8 +130,9 @@ public final class DOMImmutableDataChangeEvent implements
         private final ImmutableMap.Builder<InstanceIdentifier, NormalizedNode<?, ?>> updated = ImmutableMap.builder();
         private final ImmutableSet.Builder<InstanceIdentifier> removed = ImmutableSet.builder();
 
-        private Builder() {
-
+        private Builder(final DataChangeScope scope) {
+            Preconditions.checkNotNull(scope, "Data change scope should not be null.");
+            this.scope = scope;
         }
 
         public Builder setAfter(final NormalizedNode<?, ?> node) {
@@ -126,4 +178,27 @@ public final class DOMImmutableDataChangeEvent implements
         }
     }
 
+    private static final class RemoveEventFactory implements SimpleEventFactory {
+
+        @Override
+        public DOMImmutableDataChangeEvent create(final InstanceIdentifier path, final NormalizedNode<PathArgument, ?> data) {
+            return builder(DataChangeScope.BASE) //
+                    .setBefore(data) //
+                    .addRemoved(path, data) //
+                    .build();
+        }
+
+    }
+
+    private static final class CreateEventFactory implements SimpleEventFactory {
+
+        @Override
+        public DOMImmutableDataChangeEvent create(final InstanceIdentifier path, final NormalizedNode<PathArgument, ?> data) {
+            return builder(DataChangeScope.BASE) //
+                    .setAfter(data) //
+                    .addCreated(path, data) //
+                    .build();
+        }
+    }
+
 }
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java
deleted file mode 100644 (file)
index df2725d..0000000
+++ /dev/null
@@ -1,233 +0,0 @@
-package org.opendaylight.controller.md.sal.dom.store.impl;
-
-import static org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.builder;
-import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.append;
-import static org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils.getChild;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
-import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
-public class DataChangeEventResolver {
-    private static final Logger LOG = LoggerFactory.getLogger(DataChangeEventResolver.class);
-    private static final DOMImmutableDataChangeEvent NO_CHANGE = builder().build();
-    private final ImmutableList.Builder<ChangeListenerNotifyTask> tasks = ImmutableList.builder();
-    private InstanceIdentifier rootPath;
-    private ListenerTree listenerRoot;
-    private NodeModification modificationRoot;
-    private Optional<StoreMetadataNode> beforeRoot;
-    private Optional<StoreMetadataNode> afterRoot;
-
-    protected InstanceIdentifier getRootPath() {
-        return rootPath;
-    }
-
-    protected DataChangeEventResolver setRootPath(final InstanceIdentifier rootPath) {
-        this.rootPath = rootPath;
-        return this;
-    }
-
-    protected ListenerTree getListenerRoot() {
-        return listenerRoot;
-    }
-
-    protected DataChangeEventResolver setListenerRoot(final ListenerTree listenerRoot) {
-        this.listenerRoot = listenerRoot;
-        return this;
-    }
-
-    protected NodeModification getModificationRoot() {
-        return modificationRoot;
-    }
-
-    protected DataChangeEventResolver setModificationRoot(final NodeModification modificationRoot) {
-        this.modificationRoot = modificationRoot;
-        return this;
-    }
-
-    protected Optional<StoreMetadataNode> getBeforeRoot() {
-        return beforeRoot;
-    }
-
-    protected DataChangeEventResolver setBeforeRoot(final Optional<StoreMetadataNode> beforeRoot) {
-        this.beforeRoot = beforeRoot;
-        return this;
-    }
-
-    protected Optional<StoreMetadataNode> getAfterRoot() {
-        return afterRoot;
-    }
-
-    protected DataChangeEventResolver setAfterRoot(final Optional<StoreMetadataNode> afterRoot) {
-        this.afterRoot = afterRoot;
-        return this;
-    }
-
-    public Iterable<ChangeListenerNotifyTask> resolve() {
-        LOG.trace("Resolving events for {}", modificationRoot);
-
-        try (final Walker w = listenerRoot.getWalker()) {
-            resolveAnyChangeEvent(rootPath, Optional.of(w.getRootNode()), modificationRoot, beforeRoot, afterRoot);
-            return tasks.build();
-        }
-    }
-
-    private DOMImmutableDataChangeEvent resolveAnyChangeEvent(final InstanceIdentifier path,
-            final Optional<ListenerTree.Node> listeners, final NodeModification modification,
-            final Optional<StoreMetadataNode> before, final Optional<StoreMetadataNode> after) {
-        // No listeners are present in listener registration subtree
-        // no before and after state is present
-        if (!before.isPresent() && !after.isPresent()) {
-            return NO_CHANGE;
-        }
-        switch (modification.getModificationType()) {
-        case SUBTREE_MODIFIED:
-            return resolveSubtreeChangeEvent(path, listeners, modification, before.get(), after.get());
-        case WRITE:
-            if (before.isPresent()) {
-                return resolveReplacedEvent(path, listeners, modification, before.get(), after.get());
-            } else {
-                return resolveCreateEvent(path, listeners, after.get());
-            }
-        case DELETE:
-            return resolveDeleteEvent(path, listeners, before.get());
-        default:
-            return NO_CHANGE;
-        }
-
-    }
-
-    /**
-     * Resolves create events deep down the interest listener tree.
-     *
-     *
-     * @param path
-     * @param listeners
-     * @param afterState
-     * @return
-     */
-    private DOMImmutableDataChangeEvent resolveCreateEvent(final InstanceIdentifier path,
-            final Optional<ListenerTree.Node> listeners, final StoreMetadataNode afterState) {
-        final NormalizedNode<?, ?> node = afterState.getData();
-        Builder builder = builder().setAfter(node).addCreated(path, node);
-
-        for (StoreMetadataNode child : afterState.getChildren()) {
-            PathArgument childId = child.getIdentifier();
-            Optional<ListenerTree.Node> childListeners = getChild(listeners, childId);
-
-            InstanceIdentifier childPath = StoreUtils.append(path, childId);
-            builder.merge(resolveCreateEvent(childPath, childListeners, child));
-        }
-
-        return addNotifyTask(listeners, builder.build());
-    }
-
-    private DOMImmutableDataChangeEvent resolveDeleteEvent(final InstanceIdentifier path,
-            final Optional<ListenerTree.Node> listeners, final StoreMetadataNode beforeState) {
-        final NormalizedNode<?, ?> node = beforeState.getData();
-        Builder builder = builder().setBefore(node).addRemoved(path, node);
-
-        for (StoreMetadataNode child : beforeState.getChildren()) {
-            PathArgument childId = child.getIdentifier();
-            Optional<ListenerTree.Node> childListeners = getChild(listeners, childId);
-            InstanceIdentifier childPath = StoreUtils.append(path, childId);
-            builder.merge(resolveDeleteEvent(childPath, childListeners, child));
-        }
-        return addNotifyTask(listeners, builder.build());
-    }
-
-    private DOMImmutableDataChangeEvent resolveSubtreeChangeEvent(final InstanceIdentifier path,
-            final Optional<ListenerTree.Node> listeners, final NodeModification modification,
-            final StoreMetadataNode before, final StoreMetadataNode after) {
-
-        Builder one = builder().setBefore(before.getData()).setAfter(after.getData());
-
-        Builder subtree = builder();
-
-        for (NodeModification childMod : modification.getModifications()) {
-            PathArgument childId = childMod.getIdentifier();
-            InstanceIdentifier childPath = append(path, childId);
-            Optional<ListenerTree.Node> childListen = getChild(listeners, childId);
-
-            Optional<StoreMetadataNode> childBefore = before.getChild(childId);
-            Optional<StoreMetadataNode> childAfter = after.getChild(childId);
-
-            switch (childMod.getModificationType()) {
-            case WRITE:
-            case DELETE:
-                one.merge(resolveAnyChangeEvent(childPath, childListen, childMod, childBefore, childAfter));
-                break;
-            case SUBTREE_MODIFIED:
-                subtree.merge(resolveSubtreeChangeEvent(childPath, childListen, childMod, childBefore.get(),
-                        childAfter.get()));
-                break;
-            case UNMODIFIED:
-                // no-op
-                break;
-            }
-        }
-        DOMImmutableDataChangeEvent oneChangeEvent = one.build();
-        subtree.merge(oneChangeEvent);
-        DOMImmutableDataChangeEvent subtreeEvent = subtree.build();
-        if (listeners.isPresent()) {
-            addNotifyTask(listeners.get(), DataChangeScope.ONE, oneChangeEvent);
-            addNotifyTask(listeners.get(), DataChangeScope.SUBTREE, subtreeEvent);
-        }
-        return subtreeEvent;
-    }
-
-    private DOMImmutableDataChangeEvent resolveReplacedEvent(final InstanceIdentifier path,
-            final Optional<ListenerTree.Node> listeners, final NodeModification modification,
-            final StoreMetadataNode before, final StoreMetadataNode after) {
-        // FIXME Add task
-        return builder().build();
-    }
-
-    private DOMImmutableDataChangeEvent addNotifyTask(final Optional<ListenerTree.Node> listeners, final DOMImmutableDataChangeEvent event) {
-        if (listeners.isPresent()) {
-            final Collection<DataChangeListenerRegistration<?>> l = listeners.get().getListeners();
-            if (!l.isEmpty()) {
-                tasks.add(new ChangeListenerNotifyTask(ImmutableSet.copyOf(l), event));
-            }
-        }
-
-        return event;
-    }
-
-    private void addNotifyTask(final ListenerTree.Node listenerRegistrationNode, final DataChangeScope scope,
-            final DOMImmutableDataChangeEvent event) {
-        Collection<DataChangeListenerRegistration<?>> potential = listenerRegistrationNode.getListeners();
-        if(!potential.isEmpty()) {
-            final Set<DataChangeListenerRegistration<?>> toNotify = new HashSet<>(potential.size());
-            for(DataChangeListenerRegistration<?> listener : potential) {
-                if(scope.equals(listener.getScope())) {
-                    toNotify.add(listener);
-                }
-            }
-
-            if (!toNotify.isEmpty()) {
-                tasks.add(new ChangeListenerNotifyTask(toNotify, event));
-            }
-        }
-    }
-
-    public static DataChangeEventResolver create() {
-        return new DataChangeEventResolver();
-    }
-}
index a854c48..a66fa7f 100644 (file)
@@ -120,7 +120,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             if (currentState.isPresent()) {
                 final NormalizedNode<?, ?> data = currentState.get().getData();
 
-                final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
+                final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
                         .setAfter(data) //
                         .addCreated(path, data) //
                         .build();
@@ -149,7 +149,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     }
 
     private void commit(final DataAndMetadataSnapshot currentSnapshot,
-            final StoreMetadataNode newDataTree, final DataChangeEventResolver listenerResolver) {
+            final StoreMetadataNode newDataTree, final ResolveDataChangeEventsTask listenerResolver) {
         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
 
         if(LOG.isTraceEnabled()) {
@@ -168,7 +168,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
             checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
 
-            for (ChangeListenerNotifyTask task : listenerResolver.resolve()) {
+            for (ChangeListenerNotifyTask task : listenerResolver.call()) {
                 executor.submit(task);
             }
         }
@@ -306,7 +306,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         private DataAndMetadataSnapshot storeSnapshot;
         private Optional<StoreMetadataNode> proposedSubtree;
-        private DataChangeEventResolver listenerResolver;
+        private ResolveDataChangeEventsTask listenerResolver;
 
         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
             this.transaction = writeTransaction;
@@ -347,7 +347,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
                             increase(metadataTree.getSubtreeVersion()));
 
-                    listenerResolver = DataChangeEventResolver.create() //
+                    listenerResolver = ResolveDataChangeEventsTask.create() //
                             .setRootPath(PUBLIC_ROOT_PATH) //
                             .setBeforeRoot(Optional.of(metadataTree)) //
                             .setAfterRoot(proposedSubtree) //
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java
new file mode 100644 (file)
index 0000000..befeaf4
--- /dev/null
@@ -0,0 +1,543 @@
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.builder;
+import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.append;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Node;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodeContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+
+/**
+ *
+ * Resolve Data Change Events based on modifications and listeners
+ *
+ * Computes data change events for all affected registered listeners in data
+ * tree.
+ *
+ * Prerequisites for computation is to set all parameters properly:
+ * <ul>
+ * <li>{@link #setRootPath(InstanceIdentifier)} - Root path of datastore
+ * <li>{@link #setListenerRoot(ListenerTree)} - Root of listener registration
+ * tree, which contains listeners to be notified
+ * <li>{@link #setModificationRoot(NodeModification)} - Modification root, for
+ * which events should be computed
+ * <li>{@link #setBeforeRoot(Optional)} - State of before modification occurred
+ * <li>{@link #setAfterRoot(Optional)} - State of after modification occurred
+ * </ul>
+ *
+ */
+public class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListenerNotifyTask>> {
+    private static final Logger LOG = LoggerFactory.getLogger(ResolveDataChangeEventsTask.class);
+    private static final DOMImmutableDataChangeEvent NO_CHANGE = builder(DataChangeScope.BASE).build();
+
+    private InstanceIdentifier rootPath;
+    private ListenerTree listenerRoot;
+    private NodeModification modificationRoot;
+    private Optional<StoreMetadataNode> beforeRoot;
+    private Optional<StoreMetadataNode> afterRoot;
+    private final Multimap<ListenerTree.Node, DOMImmutableDataChangeEvent> events = HashMultimap.create();
+
+    protected InstanceIdentifier getRootPath() {
+        return rootPath;
+    }
+
+    protected ResolveDataChangeEventsTask setRootPath(final InstanceIdentifier rootPath) {
+        this.rootPath = rootPath;
+        return this;
+    }
+
+    protected ListenerTree getListenerRoot() {
+        return listenerRoot;
+    }
+
+    protected ResolveDataChangeEventsTask setListenerRoot(final ListenerTree listenerRoot) {
+        this.listenerRoot = listenerRoot;
+        return this;
+    }
+
+    protected NodeModification getModificationRoot() {
+        return modificationRoot;
+    }
+
+    protected ResolveDataChangeEventsTask setModificationRoot(final NodeModification modificationRoot) {
+        this.modificationRoot = modificationRoot;
+        return this;
+    }
+
+    protected Optional<StoreMetadataNode> getBeforeRoot() {
+        return beforeRoot;
+    }
+
+    protected ResolveDataChangeEventsTask setBeforeRoot(final Optional<StoreMetadataNode> beforeRoot) {
+        this.beforeRoot = beforeRoot;
+        return this;
+    }
+
+    protected Optional<StoreMetadataNode> getAfterRoot() {
+        return afterRoot;
+    }
+
+    protected ResolveDataChangeEventsTask setAfterRoot(final Optional<StoreMetadataNode> afterRoot) {
+        this.afterRoot = afterRoot;
+        return this;
+    }
+
+    /**
+     * Resolves and creates Notification Tasks
+     *
+     * Implementation of done as Map-Reduce with two steps: 1. resolving events
+     * and their mapping to listeners 2. merging events affecting same listener
+     *
+     * @return Iterable of Notification Tasks which needs to be executed in
+     *         order to delivery data change events.
+     */
+    @Override
+    public Iterable<ChangeListenerNotifyTask> call() {
+        LOG.trace("Resolving events for {}", modificationRoot);
+
+        try (final Walker w = listenerRoot.getWalker()) {
+            resolveAnyChangeEvent(rootPath, Collections.singleton(w.getRootNode()), modificationRoot, beforeRoot,
+                    afterRoot);
+            return createNotificationTasks();
+        }
+    }
+
+    /**
+     *
+     * Walks map of listeners to data change events, creates notification
+     * delivery tasks.
+     *
+     * Walks map of registered and affected listeners and creates notification
+     * tasks from set of listeners and events to be delivered.
+     *
+     * If set of listeners has more then one event (applicable to wildcarded
+     * listeners), merges all data change events into one, final which contains
+     * all separate updates.
+     *
+     * Dispatch between merge variant and reuse variant of notification task is
+     * done in
+     * {@link #addNotificationTask(com.google.common.collect.ImmutableList.Builder, Node, Collection)}
+     *
+     * @return Collection of notification tasks.
+     */
+    private Collection<ChangeListenerNotifyTask> createNotificationTasks() {
+        ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder = ImmutableList.builder();
+        for (Entry<ListenerTree.Node, Collection<DOMImmutableDataChangeEvent>> entry : events.asMap().entrySet()) {
+            addNotificationTask(taskListBuilder, entry.getKey(), entry.getValue());
+        }
+        return taskListBuilder.build();
+    }
+
+    /**
+     * Adds notification task to task list.
+     *
+     * If entry collection contains one event, this event is reused and added to
+     * notification tasks for listeners (see
+     * {@link #addNotificationTaskByScope(com.google.common.collect.ImmutableList.Builder, Node, DOMImmutableDataChangeEvent)}
+     * . Otherwise events are merged by scope and distributed between listeners
+     * to particular scope. See
+     * {@link #addNotificationTasksAndMergeEvents(com.google.common.collect.ImmutableList.Builder, Node, Collection)}
+     * .
+     *
+     * @param taskListBuilder
+     * @param listeners
+     * @param entries
+     */
+    private static void addNotificationTask(final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder,
+            final ListenerTree.Node listeners, final Collection<DOMImmutableDataChangeEvent> entries) {
+
+        if (!entries.isEmpty()) {
+            if (entries.size() == 1) {
+                addNotificationTaskByScope(taskListBuilder, listeners, Iterables.getOnlyElement(entries));
+            } else {
+                addNotificationTasksAndMergeEvents(taskListBuilder, listeners, entries);
+            }
+        }
+    }
+
+    /**
+     *
+     * Add notification deliveries task to the listener.
+     *
+     *
+     * @param taskListBuilder
+     * @param listeners
+     * @param event
+     */
+    private static void addNotificationTaskByScope(
+            final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
+            final DOMImmutableDataChangeEvent event) {
+        DataChangeScope eventScope = event.getScope();
+        for (DataChangeListenerRegistration<?> listenerReg : listeners.getListeners()) {
+            DataChangeScope listenerScope = listenerReg.getScope();
+            List<DataChangeListenerRegistration<?>> listenerSet = Collections
+                    .<DataChangeListenerRegistration<?>> singletonList(listenerReg);
+            if (eventScope == DataChangeScope.BASE) {
+                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+            } else if (eventScope == DataChangeScope.ONE && listenerScope != DataChangeScope.BASE) {
+                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+            } else if (eventScope == DataChangeScope.SUBTREE && listenerScope == DataChangeScope.SUBTREE) {
+                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+            }
+        }
+    }
+
+    /**
+     *
+     * Add notification tasks with merged event
+     *
+     * Separate Events by scope and creates merged notification tasks for each
+     * and every scope which is present.
+     *
+     * Adds merged events to task list based on scope requested by client.
+     *
+     * @param taskListBuilder
+     * @param listeners
+     * @param entries
+     */
+    private static void addNotificationTasksAndMergeEvents(
+            final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
+            final Collection<DOMImmutableDataChangeEvent> entries) {
+
+        final Builder baseBuilder = builder(DataChangeScope.BASE);
+        final Builder oneBuilder = builder(DataChangeScope.ONE);
+        final Builder subtreeBuilder = builder(DataChangeScope.SUBTREE);
+
+        boolean baseModified = false;
+        boolean oneModified = false;
+        boolean subtreeModified = false;
+        for (final DOMImmutableDataChangeEvent entry : entries) {
+            switch (entry.getScope()) {
+            // Absence of breaks is intentional here. Subtree contains base and
+            // one, one also contains base
+            case BASE:
+                baseBuilder.merge(entry);
+                baseModified = true;
+            case ONE:
+                oneBuilder.merge(entry);
+                oneModified = true;
+            case SUBTREE:
+                subtreeBuilder.merge(entry);
+                subtreeModified = true;
+            }
+        }
+
+        if (baseModified) {
+            addNotificationTaskExclusively(taskListBuilder, listeners, baseBuilder.build());
+        }
+        if (oneModified) {
+            addNotificationTaskExclusively(taskListBuilder, listeners, oneBuilder.build());
+        }
+        if (subtreeModified) {
+            addNotificationTaskExclusively(taskListBuilder, listeners, subtreeBuilder.build());
+        }
+    }
+
+    private static void addNotificationTaskExclusively(
+            final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final Node listeners,
+            final DOMImmutableDataChangeEvent event) {
+        for (DataChangeListenerRegistration<?> listener : listeners.getListeners()) {
+            if (listener.getScope() == event.getScope()) {
+                Set<DataChangeListenerRegistration<?>> listenerSet = Collections
+                        .<DataChangeListenerRegistration<?>> singleton(listener);
+                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+            }
+        }
+    }
+
+    /**
+     * Resolves data change event for supplied node
+     *
+     * @param path
+     *            Path to current node in tree
+     * @param listeners
+     *            Collection of Listener registration nodes interested in
+     *            subtree
+     * @param modification
+     *            Modification of current node
+     * @param before
+     *            - Original (before) state of current node
+     * @param after
+     *            - After state of current node
+     * @return Data Change Event of this node and all it's children
+     */
+    private DOMImmutableDataChangeEvent resolveAnyChangeEvent(final InstanceIdentifier path,
+            final Collection<ListenerTree.Node> listeners, final NodeModification modification,
+            final Optional<StoreMetadataNode> before, final Optional<StoreMetadataNode> after) {
+        // No listeners are present in listener registration subtree
+        // no before and after state is present
+        if (!before.isPresent() && !after.isPresent()) {
+            return NO_CHANGE;
+        }
+        switch (modification.getModificationType()) {
+        case SUBTREE_MODIFIED:
+            return resolveSubtreeChangeEvent(path, listeners, modification, before.get(), after.get());
+        case WRITE:
+            if (before.isPresent()) {
+                return resolveReplacedEvent(path, listeners, before.get().getData(), after.get().getData());
+            } else {
+                return resolveCreateEvent(path, listeners, after.get());
+            }
+        case DELETE:
+            return resolveDeleteEvent(path, listeners, before.get());
+        default:
+            return NO_CHANGE;
+        }
+
+    }
+
+    private DOMImmutableDataChangeEvent resolveReplacedEvent(final InstanceIdentifier path,
+            final Collection<Node> listeners, final NormalizedNode<?, ?> beforeData,
+            final NormalizedNode<?, ?> afterData) {
+
+        if (beforeData instanceof NormalizedNodeContainer<?, ?, ?> && !listeners.isEmpty()) {
+            // Node is container (contains child) and we have interested
+            // listeners registered for it, that means we need to do
+            // resolution of changes on children level and can not
+            // shortcut resolution.
+
+            @SuppressWarnings("unchecked")
+            NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> beforeCont = (NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>>) beforeData;
+            @SuppressWarnings("unchecked")
+            NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> afterCont = (NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>>) afterData;
+            return resolveNodeContainerReplaced(path, listeners, beforeCont, afterCont);
+        } else if (!beforeData.equals(afterData)) {
+            // Node is either of Leaf type (does not contain child nodes)
+            // or we do not have listeners, so normal equals method is
+            // sufficient for determining change.
+
+            DOMImmutableDataChangeEvent event = builder(DataChangeScope.BASE).setBefore(beforeData).setAfter(afterData)
+                    .addUpdated(path, beforeData, afterData).build();
+            addPartialTask(listeners, event);
+            return event;
+        } else {
+            return NO_CHANGE;
+        }
+    }
+
+    private DOMImmutableDataChangeEvent resolveNodeContainerReplaced(final InstanceIdentifier path,
+            final Collection<Node> listeners,
+            final NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> beforeCont,
+            final NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> afterCont) {
+        final Set<PathArgument> alreadyProcessed = new HashSet<>();
+        final List<DOMImmutableDataChangeEvent> childChanges = new LinkedList<>();
+
+        DataChangeScope potentialScope = DataChangeScope.BASE;
+        // We look at all children from before and compare it with after state.
+        for (NormalizedNode<PathArgument, ?> beforeChild : beforeCont.getValue()) {
+            PathArgument childId = beforeChild.getIdentifier();
+            alreadyProcessed.add(childId);
+            InstanceIdentifier childPath = append(path, childId);
+            Collection<ListenerTree.Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
+            Optional<NormalizedNode<PathArgument, ?>> afterChild = afterCont.getChild(childId);
+            DOMImmutableDataChangeEvent childChange = resolveNodeContainerChildUpdated(childPath, childListeners,
+                    beforeChild, afterChild);
+            // If change is empty (equals to NO_CHANGE)
+            if (childChange != NO_CHANGE) {
+                childChanges.add(childChange);
+            }
+
+        }
+
+        for (NormalizedNode<PathArgument, ?> afterChild : afterCont.getValue()) {
+            PathArgument childId = afterChild.getIdentifier();
+            if (!alreadyProcessed.contains(childId)) {
+                // We did not processed that child already
+                // and it was not present in previous loop, that means it is
+                // created.
+                Collection<ListenerTree.Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
+                InstanceIdentifier childPath = append(path,childId);
+                childChanges.add(resolveSameEventRecursivelly(childPath , childListeners, afterChild,
+                        DOMImmutableDataChangeEvent.getCreateEventFactory()));
+            }
+        }
+        if (childChanges.isEmpty()) {
+            return NO_CHANGE;
+        }
+
+        Builder eventBuilder = builder(potentialScope) //
+                .setBefore(beforeCont) //
+                .setAfter(afterCont);
+        for (DOMImmutableDataChangeEvent childChange : childChanges) {
+            eventBuilder.merge(childChange);
+        }
+
+        DOMImmutableDataChangeEvent replaceEvent = eventBuilder.build();
+        addPartialTask(listeners, replaceEvent);
+        return replaceEvent;
+    }
+
+    private DOMImmutableDataChangeEvent resolveNodeContainerChildUpdated(final InstanceIdentifier path,
+            final Collection<Node> listeners, final NormalizedNode<PathArgument, ?> before,
+            final Optional<NormalizedNode<PathArgument, ?>> after) {
+
+        if (after.isPresent()) {
+            // REPLACE or SUBTREE Modified
+            return resolveReplacedEvent(path, listeners, before, after.get());
+
+        } else {
+            // AFTER state is not present - child was deleted.
+            return resolveSameEventRecursivelly(path, listeners, before,
+                    DOMImmutableDataChangeEvent.getRemoveEventFactory());
+        }
+    }
+
+    /**
+     * Resolves create events deep down the interest listener tree.
+     *
+     *
+     * @param path
+     * @param listeners
+     * @param afterState
+     * @return
+     */
+    private DOMImmutableDataChangeEvent resolveCreateEvent(final InstanceIdentifier path,
+            final Collection<ListenerTree.Node> listeners, final StoreMetadataNode afterState) {
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        final NormalizedNode<PathArgument, ?> node = (NormalizedNode) afterState.getData();
+        return resolveSameEventRecursivelly(path, listeners, node, DOMImmutableDataChangeEvent.getCreateEventFactory());
+    }
+
+    private DOMImmutableDataChangeEvent resolveDeleteEvent(final InstanceIdentifier path,
+            final Collection<ListenerTree.Node> listeners, final StoreMetadataNode beforeState) {
+
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        final NormalizedNode<PathArgument, ?> node = (NormalizedNode) beforeState.getData();
+        return resolveSameEventRecursivelly(path, listeners, node, DOMImmutableDataChangeEvent.getRemoveEventFactory());
+    }
+
+    private DOMImmutableDataChangeEvent resolveSameEventRecursivelly(final InstanceIdentifier path,
+            final Collection<Node> listeners, final NormalizedNode<PathArgument, ?> node,
+            final SimpleEventFactory eventFactory) {
+
+        DOMImmutableDataChangeEvent event = eventFactory.create(path, node);
+
+        if (!listeners.isEmpty()) {
+            // We have listeners for this node or it's children, so we will try
+            // to do additional processing
+            if (node instanceof NormalizedNodeContainer<?, ?, ?>) {
+                // Node has children, so we will try to resolve it's children
+                // changes.
+                @SuppressWarnings("unchecked")
+                NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>> container = (NormalizedNodeContainer<?, PathArgument, NormalizedNode<PathArgument, ?>>) node;
+                for (NormalizedNode<PathArgument, ?> child : container.getValue()) {
+                    PathArgument childId = child.getIdentifier();
+                    Collection<Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
+                    if (!childListeners.isEmpty()) {
+                        resolveSameEventRecursivelly(append(path, childId), childListeners, child, eventFactory);
+                    }
+                }
+            }
+            addPartialTask(listeners, event);
+        }
+        return event;
+    }
+
+    private DOMImmutableDataChangeEvent resolveSubtreeChangeEvent(final InstanceIdentifier path,
+            final Collection<ListenerTree.Node> listeners, final NodeModification modification,
+            final StoreMetadataNode before, final StoreMetadataNode after) {
+
+        Builder one = builder(DataChangeScope.ONE).setBefore(before.getData()).setAfter(after.getData());
+
+        Builder subtree = builder(DataChangeScope.SUBTREE);
+
+        for (NodeModification childMod : modification.getModifications()) {
+            PathArgument childId = childMod.getIdentifier();
+            InstanceIdentifier childPath = append(path, childId);
+            Collection<ListenerTree.Node> childListeners = getListenerChildrenWildcarded(listeners, childId);
+
+            Optional<StoreMetadataNode> childBefore = before.getChild(childId);
+            Optional<StoreMetadataNode> childAfter = after.getChild(childId);
+
+            switch (childMod.getModificationType()) {
+            case WRITE:
+            case DELETE:
+                one.merge(resolveAnyChangeEvent(childPath, childListeners, childMod, childBefore, childAfter));
+                break;
+            case SUBTREE_MODIFIED:
+                subtree.merge(resolveSubtreeChangeEvent(childPath, childListeners, childMod, childBefore.get(),
+                        childAfter.get()));
+                break;
+            case UNMODIFIED:
+                // no-op
+                break;
+            }
+        }
+        DOMImmutableDataChangeEvent oneChangeEvent = one.build();
+        subtree.merge(oneChangeEvent);
+        DOMImmutableDataChangeEvent subtreeEvent = subtree.build();
+        if (!listeners.isEmpty()) {
+            addPartialTask(listeners, oneChangeEvent);
+            addPartialTask(listeners, subtreeEvent);
+        }
+        return subtreeEvent;
+    }
+
+    private DOMImmutableDataChangeEvent addPartialTask(final Collection<ListenerTree.Node> listeners,
+            final DOMImmutableDataChangeEvent event) {
+
+        for (ListenerTree.Node listenerNode : listeners) {
+            if (!listenerNode.getListeners().isEmpty()) {
+                events.put(listenerNode, event);
+            }
+        }
+        return event;
+    }
+
+    private static Collection<ListenerTree.Node> getListenerChildrenWildcarded(final Collection<ListenerTree.Node> parentNodes,
+            final PathArgument child) {
+        if (parentNodes.isEmpty()) {
+            return Collections.emptyList();
+        }
+        com.google.common.collect.ImmutableList.Builder<ListenerTree.Node> result = ImmutableList.builder();
+        if (child instanceof NodeWithValue || child instanceof NodeIdentifierWithPredicates) {
+            NodeIdentifier wildcardedIdentifier = new NodeIdentifier(child.getNodeType());
+            addChildrenNodesToBuilder(result, parentNodes, wildcardedIdentifier);
+        }
+        addChildrenNodesToBuilder(result, parentNodes, child);
+        return result.build();
+    }
+
+    private static void addChildrenNodesToBuilder(final ImmutableList.Builder<ListenerTree.Node> result,
+            final Collection<ListenerTree.Node> parentNodes, final PathArgument childIdentifier) {
+        for (ListenerTree.Node node : parentNodes) {
+            Optional<ListenerTree.Node> child = node.getChild(childIdentifier);
+            if (child.isPresent()) {
+                result.add(child.get());
+            }
+        }
+    }
+
+    public static ResolveDataChangeEventsTask create() {
+        return new ResolveDataChangeEventsTask();
+    }
+}
index 83cfcac..f93f40a 100644 (file)
@@ -220,6 +220,13 @@ public final class ListenerTree {
             children.remove(arg);
             removeThisIfUnused();
         }
+
+        @Override
+        public String toString() {
+            return "Node [identifier=" + identifier + ", listeners=" + listeners.size() + ", children=" + children.size() + "]";
+        }
+
+
     }
 
     private abstract static class DataChangeListenerRegistrationImpl<T extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> extends AbstractListenerRegistration<T> //