Bug 499: Added support for change listeners. 43/5743/3
authorTony Tkacik <ttkacik@cisco.com>
Fri, 21 Mar 2014 12:30:45 +0000 (13:30 +0100)
committerTony Tkacik <ttkacik@cisco.com>
Fri, 28 Mar 2014 21:32:40 +0000 (22:32 +0100)
Change-Id: I9e9c46f13f77ac8b1bda459c35724a5e0b24d91f
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMImmutableDataChangeEvent.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataAndMetadataSnapshot.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/StoreUtils.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/TreeNodeUtils.java

index 313a2c3..3fafad7 100644 (file)
@@ -259,21 +259,32 @@ public class DOMDataBrokerImpl implements DOMDataBroker {
         public RpcResult<TransactionStatus> call() throws Exception {
 
             Boolean canCommit = canCommit().get();
-
-            if (canCommit) {
-                try {
-                    preCommit().get();
+            try {
+                if (canCommit) {
                     try {
-                        commit().get();
+                        preCommit().get();
+                        try {
+                            commit().get();
+                            return null;
+                        } catch (InterruptedException | ExecutionException e) {
+                            COORDINATOR_LOG.error("Tx: {} Error during commit", transaction.getIdentifier(), e);
+                        }
+
                     } catch (InterruptedException | ExecutionException e) {
-                        // ERROR
+                        COORDINATOR_LOG.warn("Tx: {} Error during preCommit, starting Abort",
+                                transaction.getIdentifier(), e);
                     }
-
-                } catch (InterruptedException | ExecutionException e) {
+                } else {
                     abort().get();
                 }
-            } else {
+            } catch (InterruptedException | ExecutionException e) {
+                COORDINATOR_LOG.warn("Tx: {} Error during canCommit, starting Abort", transaction.getIdentifier(), e);
+
+            }
+            try {
                 abort().get();
+            } catch (InterruptedException | ExecutionException e) {
+                COORDINATOR_LOG.error("Tx: {} Error during abort", transaction.getIdentifier(), e);
             }
             return null;
         }
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java
new file mode 100644 (file)
index 0000000..ff0fbf9
--- /dev/null
@@ -0,0 +1,35 @@
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ChangeListenerNotifyTask implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
+    private final Iterable<DataChangeListenerRegistration<?>> listeners;
+    private final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event;
+
+    public ChangeListenerNotifyTask(final Iterable<DataChangeListenerRegistration<?>> listeners,
+            final AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> event) {
+        this.listeners = listeners;
+        this.event = event;
+    }
+
+    @Override
+    public void run() {
+
+        for (DataChangeListenerRegistration<?> listener : listeners) {
+            try {
+                listener.getInstance().onDataChanged(event);
+            } catch (Exception e) {
+                LOG.error("Unhandled exception during invoking listener {} with event {}", listener, event, e);
+            }
+        }
+
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMImmutableDataChangeEvent.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMImmutableDataChangeEvent.java
new file mode 100644 (file)
index 0000000..d0ebcf5
--- /dev/null
@@ -0,0 +1,125 @@
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public final class DOMImmutableDataChangeEvent implements
+        AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+    private final NormalizedNode<?, ?> original;
+    private final NormalizedNode<?, ?> updated;
+    private final Map<InstanceIdentifier, ? extends NormalizedNode<?, ?>> originalData;
+    private final Map<InstanceIdentifier, NormalizedNode<?, ?>> createdData;
+    private final Map<InstanceIdentifier, NormalizedNode<?, ?>> updatedData;
+    private final Set<InstanceIdentifier> removedPaths;
+
+    private DOMImmutableDataChangeEvent(final Builder change) {
+        original = change.before;
+        updated = change.after;
+        originalData = change.original.build();
+        createdData = change.created.build();
+        updatedData = change.updated.build();
+        removedPaths = change.removed.build();
+    }
+
+    public static final Builder builder() {
+        return new Builder();
+    }
+
+    @Override
+    public NormalizedNode<?, ?> getOriginalSubtree() {
+        return original;
+    }
+
+    @Override
+    public NormalizedNode<?, ?> getUpdatedSubtree() {
+        return updated;
+    }
+
+    @Override
+    public Map<InstanceIdentifier, ? extends NormalizedNode<?, ?>> getOriginalData() {
+        return originalData;
+    }
+
+    @Override
+    public Map<InstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
+        return createdData;
+    }
+
+    @Override
+    public Map<InstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
+        return updatedData;
+    }
+
+    @Override
+    public Set<InstanceIdentifier> getRemovedPaths() {
+        return removedPaths;
+    }
+
+    public static class Builder {
+
+        private NormalizedNode<?, ?> after;
+        private NormalizedNode<?, ?> before;
+
+        private final ImmutableMap.Builder<InstanceIdentifier, NormalizedNode<?, ?>> original = ImmutableMap.builder();
+        private final ImmutableMap.Builder<InstanceIdentifier, NormalizedNode<?, ?>> created = ImmutableMap.builder();
+        private final ImmutableMap.Builder<InstanceIdentifier, NormalizedNode<?, ?>> updated = ImmutableMap.builder();
+        private final ImmutableSet.Builder<InstanceIdentifier> removed = ImmutableSet.builder();
+
+
+        private Builder() {
+
+        }
+
+        public Builder setAfter(final NormalizedNode<?, ?> node) {
+            after = node;
+            return this;
+        }
+
+        public DOMImmutableDataChangeEvent build() {
+
+            return new DOMImmutableDataChangeEvent(this);
+        }
+
+        public void merge(final DOMImmutableDataChangeEvent nestedChanges) {
+
+            original.putAll(nestedChanges.getOriginalData());
+            created.putAll(nestedChanges.getCreatedData());
+            updated.putAll(nestedChanges.getUpdatedData());
+            removed.addAll(nestedChanges.getRemovedPaths());
+
+        }
+
+        public Builder setBefore(final NormalizedNode<?, ?> node) {
+            this.before = node;
+            return this;
+        }
+
+        public Builder addCreated(final InstanceIdentifier path, final NormalizedNode<?, ?> node) {
+            created.put(path, node);
+            return this;
+        }
+
+        public Builder addRemoved(final InstanceIdentifier path, final NormalizedNode<?, ?> node) {
+            original.put(path, node);
+            removed.add(path);
+            return this;
+        }
+
+        public Builder addUpdated(final InstanceIdentifier path, final NormalizedNode<?, ?> before,
+                final NormalizedNode<?, ?> after) {
+            original.put(path, before);
+            updated.put(path, after);
+            return this;
+        }
+    }
+
+}
+
index 399bd5d..9961fcc 100644 (file)
@@ -23,9 +23,6 @@ class DataAndMetadataSnapshot {
     private final StoreMetadataNode metadataTree;
     private final Optional<SchemaContext> schemaContext;
 
-
-
-
     private DataAndMetadataSnapshot(final StoreMetadataNode metadataTree, final Optional<SchemaContext> schemaCtx) {
         this.metadataTree = metadataTree;
         this.schemaContext = schemaCtx;
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeEventResolver.java
new file mode 100644 (file)
index 0000000..c2faf86
--- /dev/null
@@ -0,0 +1,215 @@
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.builder;
+import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.append;
+import static org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils.getChild;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+public class DataChangeEventResolver {
+
+    private static final DOMImmutableDataChangeEvent NO_CHANGE = builder().build();
+    private InstanceIdentifier rootPath;
+    private ListenerRegistrationNode listenerRoot;
+    private NodeModification modificationRoot;
+    private Optional<StoreMetadataNode> beforeRoot;
+    private Optional<StoreMetadataNode> afterRoot;
+    private final ImmutableList.Builder<ChangeListenerNotifyTask> tasks = ImmutableList.builder();
+
+    protected InstanceIdentifier getRootPath() {
+        return rootPath;
+    }
+
+    protected DataChangeEventResolver setRootPath(final InstanceIdentifier rootPath) {
+        this.rootPath = rootPath;
+        return this;
+    }
+
+    protected ListenerRegistrationNode getListenerRoot() {
+        return listenerRoot;
+    }
+
+    protected DataChangeEventResolver setListenerRoot(final ListenerRegistrationNode listenerRoot) {
+        this.listenerRoot = listenerRoot;
+        return this;
+    }
+
+    protected NodeModification getModificationRoot() {
+        return modificationRoot;
+    }
+
+    protected DataChangeEventResolver setModificationRoot(final NodeModification modificationRoot) {
+        this.modificationRoot = modificationRoot;
+        return this;
+    }
+
+    protected Optional<StoreMetadataNode> getBeforeRoot() {
+        return beforeRoot;
+    }
+
+    protected DataChangeEventResolver setBeforeRoot(final Optional<StoreMetadataNode> beforeRoot) {
+        this.beforeRoot = beforeRoot;
+        return this;
+    }
+
+    protected Optional<StoreMetadataNode> getAfterRoot() {
+        return afterRoot;
+    }
+
+    protected DataChangeEventResolver setAfterRoot(final Optional<StoreMetadataNode> afterRoot) {
+        this.afterRoot = afterRoot;
+        return this;
+    }
+
+    public Iterable<ChangeListenerNotifyTask> resolve() {
+        resolveAnyChangeEvent(rootPath, Optional.of(listenerRoot), modificationRoot, beforeRoot, afterRoot);
+        return tasks.build();
+    }
+
+    private DOMImmutableDataChangeEvent resolveAnyChangeEvent(final InstanceIdentifier path,
+            final Optional<ListenerRegistrationNode> listeners, final NodeModification modification,
+            final Optional<StoreMetadataNode> before, final Optional<StoreMetadataNode> after) {
+        // No listeners are present in listener registration subtree
+        // no before and after state is present
+        if (!before.isPresent() && !after.isPresent()) {
+            return NO_CHANGE;
+        }
+        switch (modification.getModificationType()) {
+        case SUBTREE_MODIFIED:
+            return resolveSubtreeChangeEvent(path, listeners, modification, before.get(), after.get());
+        case WRITE:
+            if (before.isPresent()) {
+                return resolveReplacedEvent(path, listeners, modification, before.get(), after.get());
+            } else {
+                return resolveCreateEvent(path, listeners, after.get());
+            }
+        case DELETE:
+            return resolveDeleteEvent(path, listeners, before.get());
+        default:
+            return NO_CHANGE;
+        }
+
+    }
+
+    /**
+     * Resolves create events deep down the interest listener tree.
+     *
+     *
+     * @param path
+     * @param listeners
+     * @param afterState
+     * @return
+     */
+    private DOMImmutableDataChangeEvent resolveCreateEvent(final InstanceIdentifier path,
+            final Optional<ListenerRegistrationNode> listeners, final StoreMetadataNode afterState) {
+        final NormalizedNode<?, ?> node = afterState.getData();
+        Builder builder = builder().setAfter(node).addCreated(path, node);
+
+        for (StoreMetadataNode child : afterState.getChildren()) {
+            PathArgument childId = child.getIdentifier();
+            Optional<ListenerRegistrationNode> childListeners = getChild(listeners, childId);
+
+            InstanceIdentifier childPath = StoreUtils.append(path, childId);
+            builder.merge(resolveCreateEvent(childPath, childListeners, child));
+        }
+        DOMImmutableDataChangeEvent event = builder.build();
+        if (listeners.isPresent()) {
+            addNotifyTask(listeners.get().getListeners(), event);
+        }
+        return event;
+    }
+
+    private DOMImmutableDataChangeEvent resolveDeleteEvent(final InstanceIdentifier path,
+            final Optional<ListenerRegistrationNode> listeners, final StoreMetadataNode beforeState) {
+        final NormalizedNode<?, ?> node = beforeState.getData();
+        Builder builder = builder().setBefore(node).addRemoved(path, node);
+
+        for (StoreMetadataNode child : beforeState.getChildren()) {
+            PathArgument childId = child.getIdentifier();
+            Optional<ListenerRegistrationNode> childListeners = getChild(listeners, childId);
+            InstanceIdentifier childPath = StoreUtils.append(path, childId);
+            builder.merge(resolveDeleteEvent(childPath, childListeners, child));
+        }
+        DOMImmutableDataChangeEvent event = builder.build();
+        if (listeners.isPresent()) {
+            addNotifyTask(listeners.get().getListeners(), event);
+        }
+        return event;
+
+    }
+
+    private DOMImmutableDataChangeEvent resolveSubtreeChangeEvent(final InstanceIdentifier path,
+            final Optional<ListenerRegistrationNode> listeners, final NodeModification modification,
+            final StoreMetadataNode before, final StoreMetadataNode after) {
+
+        Builder one = builder().setBefore(before.getData()).setAfter(after.getData());
+
+        Builder subtree = builder();
+
+        for (NodeModification childMod : modification.getModifications()) {
+            PathArgument childId = childMod.getIdentifier();
+            InstanceIdentifier childPath = append(path, childId);
+            Optional<ListenerRegistrationNode> childListen = getChild(listeners, childId);
+
+            Optional<StoreMetadataNode> childBefore = before.getChild(childId);
+            Optional<StoreMetadataNode> childAfter = after.getChild(childId);
+
+            switch (childMod.getModificationType()) {
+            case WRITE:
+            case DELETE:
+                one.merge(resolveAnyChangeEvent(childPath, childListen, childMod, childBefore, childBefore));
+                break;
+            case SUBTREE_MODIFIED:
+                subtree.merge(resolveSubtreeChangeEvent(childPath, childListen, childMod, childBefore.get(),
+                        childAfter.get()));
+                break;
+            }
+        }
+        DOMImmutableDataChangeEvent oneChangeEvent = one.build();
+        subtree.merge(oneChangeEvent);
+        DOMImmutableDataChangeEvent subtreeEvent = subtree.build();
+        if (listeners.isPresent()) {
+            addNotifyTask(listeners.get(), DataChangeScope.ONE, oneChangeEvent);
+            addNotifyTask(listeners.get(), DataChangeScope.SUBTREE, subtreeEvent);
+        }
+        return subtreeEvent;
+    }
+
+    private DOMImmutableDataChangeEvent resolveReplacedEvent(final InstanceIdentifier path,
+            final Optional<ListenerRegistrationNode> listeners, final NodeModification modification,
+            final StoreMetadataNode before, final StoreMetadataNode after) {
+        // FIXME Add task
+        return builder().build();
+    }
+
+    private void addNotifyTask(final ListenerRegistrationNode listenerRegistrationNode, final DataChangeScope one,
+            final DOMImmutableDataChangeEvent event) {
+
+
+
+    }
+
+    private void addNotifyTask(final Iterable<DataChangeListenerRegistration<?>> listeners,
+            final DOMImmutableDataChangeEvent event) {
+        tasks .add(new ChangeListenerNotifyTask(ImmutableSet.copyOf(listeners),event));
+    }
+
+    public static DataChangeEventResolver create() {
+        return new DataChangeEventResolver();
+    }
+
+
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeListenerRegistration.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DataChangeListenerRegistration.java
new file mode 100644 (file)
index 0000000..d3a892a
--- /dev/null
@@ -0,0 +1,22 @@
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public interface DataChangeListenerRegistration<L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+extends ListenerRegistration<L> {
+
+
+    @Override
+    public L getInstance();
+
+    InstanceIdentifier getPath();
+
+    DataChangeScope getScope();
+
+
+
+}
index 4d5b8ee..39299ab 100644 (file)
@@ -16,8 +16,10 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -43,22 +45,27 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+    private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
 
+
+    private final ListeningExecutorService executor;
+    private final String name;
     private final AtomicLong txCounter = new AtomicLong(0);
 
     private DataAndMetadataSnapshot snapshot;
-    private ModificationApplyOperation operation;
+    private ModificationApplyOperation operationTree;
+    private final ListenerRegistrationNode listenerTree;
+
 
-    private final ListeningExecutorService executor;
-    private final String name;
 
     private SchemaContext schemaContext;
 
     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
         this.executor = executor;
         this.name = name;
-        this.operation = new AllwaysFailOperation();
+        this.operationTree = new AllwaysFailOperation();
         this.snapshot = DataAndMetadataSnapshot.createEmpty();
+        this.listenerTree = ListenerRegistrationNode.createRoot();
     }
 
     @Override
@@ -73,24 +80,48 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operation);
+        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operation);
+        return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
     }
 
     @Override
     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
-        operation = SchemaAwareApplyOperationRoot.from(ctx);
+        operationTree = SchemaAwareApplyOperationRoot.from(ctx);
         schemaContext = ctx;
     }
 
     @Override
     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
-        return null;
+
+        Optional<ListenerRegistrationNode> listenerNode = TreeNodeUtils.findNode(listenerTree, path);
+        checkState(listenerNode.isPresent());
+        synchronized (listener) {
+            notifyInitialState(path, listener);
+        }
+        return listenerNode.get().registerDataChangeListener(listener, scope);
+    }
+
+    private void notifyInitialState(final InstanceIdentifier path,
+            final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
+        Optional<StoreMetadataNode> currentState = snapshot.read(path);
+        try {
+            if (currentState.isPresent()) {
+                NormalizedNode<?, ?> data = currentState.get().getData();
+                listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
+                        .setAfter(data) //
+                        .addCreated(path, data) //
+                        .build() //
+                );
+            }
+        } catch (Exception e) {
+            LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
+        }
+
     }
 
     private synchronized DOMStoreThreePhaseCommitCohort submit(
@@ -102,6 +133,21 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         return name + "-" + txCounter.getAndIncrement();
     }
 
+    private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
+            final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
+        LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
+        checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
+        snapshot = DataAndMetadataSnapshot.builder() //
+                .setMetadataTree(newDataTree) //
+                .setSchemaContext(schemaContext) //
+                .build();
+
+        for(ChangeListenerNotifyTask task : listenerTasks) {
+            executor.submit(task);
+        }
+
+    }
+
     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
 
         private DataAndMetadataSnapshot stableSnapshot;
@@ -185,7 +231,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         @Override
         public synchronized DOMStoreThreePhaseCommitCohort ready() {
             ready = true;
-            LOG.debug("Store transaction: {} : Ready",getIdentifier());
+            LOG.debug("Store transaction: {} : Ready", getIdentifier());
             mutableTree.seal();
             return store.submit(this);
         }
@@ -228,6 +274,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         private DataAndMetadataSnapshot storeSnapshot;
         private Optional<StoreMetadataNode> proposedSubtree;
+        private Iterable<ChangeListenerNotifyTask> listenerTasks;
 
         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
             this.transaction = writeTransaction;
@@ -237,14 +284,15 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         @Override
         public ListenableFuture<Boolean> canCommit() {
             final DataAndMetadataSnapshot snapshotCapture = snapshot;
-            final ModificationApplyOperation snapshotOperation = operation;
+            final ModificationApplyOperation snapshotOperation = operationTree;
 
             return executor.submit(new Callable<Boolean>() {
 
                 @Override
                 public Boolean call() throws Exception {
-                    boolean applicable = snapshotOperation.isApplicable(modification, Optional.of(snapshotCapture.getMetadataTree()));
-                    LOG.debug("Store Transcation: {} : canCommit : {}",transaction.getIdentifier(),applicable);
+                    boolean applicable = snapshotOperation.isApplicable(modification,
+                            Optional.of(snapshotCapture.getMetadataTree()));
+                    LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
                     return applicable;
                 }
             });
@@ -255,10 +303,24 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             storeSnapshot = snapshot;
             return executor.submit(new Callable<Void>() {
 
+
+
                 @Override
                 public Void call() throws Exception {
                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
-                    proposedSubtree = operation.apply(modification, Optional.of(metadataTree),increase(metadataTree.getSubtreeVersion()));
+
+                    proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
+                            increase(metadataTree.getSubtreeVersion()));
+
+
+                    listenerTasks = DataChangeEventResolver.create() //
+                            .setRootPath(PUBLIC_ROOT_PATH) //
+                            .setBeforeRoot(Optional.of(metadataTree)) //
+                            .setAfterRoot(proposedSubtree) //
+                            .setModificationRoot(modification) //
+                            .setListenerRoot(listenerTree) //
+                            .resolve();
+
                     return null;
                 }
             });
@@ -276,25 +338,17 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
             checkState(proposedSubtree != null);
             checkState(storeSnapshot != null);
             // return ImmediateFuture<>;
-            InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree);
+            InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
             return Futures.<Void> immediateFuture(null);
         }
 
     }
 
-    private synchronized void commit(final DataAndMetadataSnapshot storeSnapshot,
-            final Optional<StoreMetadataNode> proposedSubtree) {
-        //LOG.info("Updating Store snaphot.");
-        checkState(snapshot == storeSnapshot, "Store snapshot and transaction snapshot differs");
-        snapshot = DataAndMetadataSnapshot.builder().setMetadataTree(proposedSubtree.get())
-                .setSchemaContext(schemaContext).build();
-    }
-
     private class AllwaysFailOperation implements ModificationApplyOperation {
 
         @Override
         public Optional<StoreMetadataNode> apply(final NodeModification modification,
-                final Optional<StoreMetadataNode> storeMeta,final UnsignedLong subtreeVersion) {
+                final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
             throw new IllegalStateException("Schema Context is not available.");
         }
 
index 0692497..4ad941a 100644 (file)
@@ -1,15 +1,96 @@
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.primitives.UnsignedLong;
 
 public final class StoreUtils {
 
+    private final static Function<Identifiable<Object>, Object> EXTRACT_IDENTIFIER = new Function<Identifiable<Object>, Object>() {
+
+        @Override
+        public Object apply(final Identifiable<Object> input) {
+            return input.getIdentifier();
+        }
+    };
 
     public static final UnsignedLong increase(final UnsignedLong original) {
         return original.plus(UnsignedLong.ONE);
     }
 
+    public static final InstanceIdentifier append(final InstanceIdentifier parent, final PathArgument arg) {
+
+        return new InstanceIdentifier(ImmutableList.<PathArgument> builder().addAll(parent.getPath()).add(arg).build());
+    }
+
+    public static AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> initialChangeEvent(
+            final InstanceIdentifier path, final StoreMetadataNode data) {
+        return new InitialDataChangeEvent(path, data.getData());
+    }
+
+    private static final class InitialDataChangeEvent implements
+            AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> {
+
+        private final ImmutableMap<InstanceIdentifier, NormalizedNode<?, ?>> payload;
+        private final NormalizedNode<?, ?> data;
+
+        public InitialDataChangeEvent(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
+            payload = ImmutableMap.<InstanceIdentifier, NormalizedNode<?, ?>> of(path, data);
+            this.data = data;
+        }
 
+        @Override
+        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getCreatedData() {
+            return payload;
+        }
 
+        @Override
+        public Map<InstanceIdentifier, ? extends NormalizedNode<?, ?>> getOriginalData() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public NormalizedNode<?, ?> getOriginalSubtree() {
+            return null;
+        }
+
+        @Override
+        public Set<InstanceIdentifier> getRemovedPaths() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Map<InstanceIdentifier, NormalizedNode<?, ?>> getUpdatedData() {
+            return payload;
+        }
+
+        @Override
+        public NormalizedNode<?, ?> getUpdatedSubtree() {
+            return data;
+        }
+
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static <V> Function<Identifiable<V>,V> identifierExtractor() {
+        return (Function) EXTRACT_IDENTIFIER;
+    }
+
+    public static <V> Set<V> toIdentifierSet(final Iterable<? extends Identifiable<V>> children) {
+        return FluentIterable.from(children).transform(StoreUtils.<V>identifierExtractor()).toSet();
+    }
 
 }
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/tree/ListenerRegistrationNode.java
new file mode 100644 (file)
index 0000000..d6d1ca3
--- /dev/null
@@ -0,0 +1,121 @@
+package org.opendaylight.controller.md.sal.dom.store.impl.tree;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+import com.google.common.base.Optional;
+
+public class ListenerRegistrationNode implements StoreTreeNode<ListenerRegistrationNode>,Identifiable<PathArgument> {
+
+    private final ListenerRegistrationNode parent;
+    private final Map<PathArgument, ListenerRegistrationNode> children;
+    private final PathArgument identifier;
+    private final ConcurrentSkipListSet<DataChangeListenerRegistration<?>> listeners;
+
+    private ListenerRegistrationNode(final PathArgument identifier) {
+        this(null,identifier);
+    }
+
+    private ListenerRegistrationNode(final ListenerRegistrationNode parent,final PathArgument identifier) {
+        this.parent = parent;
+        this.identifier = identifier;
+        children = new HashMap<>();
+        listeners = new ConcurrentSkipListSet<>();
+    }
+
+    public final static ListenerRegistrationNode createRoot() {
+        return new ListenerRegistrationNode(null);
+    }
+
+    @Override
+    public PathArgument getIdentifier() {
+        return identifier;
+    }
+
+    public Iterable<DataChangeListenerRegistration<?>> getListeners() {
+        return listeners;
+    }
+
+    @Override
+    public synchronized Optional<ListenerRegistrationNode> getChild(final PathArgument child) {
+        ListenerRegistrationNode potential = (children.get(child));
+        if(potential == null) {
+            potential = new ListenerRegistrationNode(this, child);
+            children.put(child, potential);
+        }
+        return Optional.of(potential);
+    }
+
+    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerDataChangeListener(
+            final L listener, final DataChangeScope scope) {
+        DataChangeListenerRegistration<L> listenerReg = new DataChangeListenerRegistration<L>(listener, scope,this);
+        listeners.add(listenerReg);
+        return listenerReg;
+    }
+
+    private void removeListener(final DataChangeListenerRegistration<?> listener) {
+        listeners.remove(listener);
+        removeThisIfUnused();
+    }
+
+
+    private void removeThisIfUnused() {
+        if(parent != null && listeners.isEmpty() && children.isEmpty()) {
+            parent.removeChildIfUnused(this);
+        }
+    }
+
+    public boolean isUnused() {
+        return (listeners.isEmpty() && children.isEmpty()) || areChildrenUnused();
+    }
+
+    private boolean areChildrenUnused() {
+        for(ListenerRegistrationNode child :  children.values()) {
+            if(!child.isUnused()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void removeChildIfUnused(final ListenerRegistrationNode listenerRegistrationNode) {
+        // FIXME Remove unnecessary
+    }
+
+
+
+
+    public static class DataChangeListenerRegistration<T extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> extends AbstractObjectRegistration<T>
+            implements ListenerRegistration<T> {
+
+        private final DataChangeScope scope;
+        private ListenerRegistrationNode node;
+
+        public DataChangeListenerRegistration(final T listener, final DataChangeScope scope, final ListenerRegistrationNode node) {
+            super(listener);
+
+            this.scope = scope;
+            this.node = node;
+        }
+
+        protected DataChangeScope getScope() {
+            return scope;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            node.removeListener(this);
+            node = null;
+        }
+    }
+}
index 67cfde2..dc89309 100644 (file)
@@ -64,4 +64,11 @@ public class TreeNodeUtils {
         return new SimpleEntry<InstanceIdentifier,T>(parentPath,parent.get());
     }
 
+    public static <T extends StoreTreeNode<T>> Optional<T> getChild(final Optional<T> parent,final PathArgument child) {
+        if(parent.isPresent()) {
+            return parent.get().getChild(child);
+        }
+        return Optional.absent();
+    }
+
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.