Unification of broker concepts implementations 47/2447/4
authorTony Tkacik <ttkacik@cisco.com>
Sun, 3 Nov 2013 19:37:46 +0000 (20:37 +0100)
committerTony Tkacik <ttkacik@cisco.com>
Thu, 7 Nov 2013 11:13:48 +0000 (12:13 +0100)
  - Introduced AbstractDataReadRouter into sal-common-impl
    which deals with data read routing.

  - AbstractDataReadRouter is then used inside sal-binding-broker
    to route reads between binding aware components

    and inside sal-dom-broker to route reads inside a mount point
    or between binding independent components.

    Extracted Rpc Routing logic from BrokerImpl.xtend and moved it to
    separate class, which is used in the broker (global context)
    and also in the mount-points (nested subsystems).

Change-Id: I7eaaddafe9f4dcb2ca6d25090246dfd51940b2d7
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
31 files changed:
opendaylight/md-sal/sal-binding-api/pom.xml
opendaylight/md-sal/sal-binding-broker/pom.xml
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataProviderContext.xtend [deleted file]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/util/BindingAwareDataReaderRouter.xtend [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataChange.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataCommitHandler.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Route.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangePublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Router.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/pom.xml
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/AbstractDataReadRouter.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcConsumptionRegistry.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcProvisionRegistry.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/data/DataProviderService.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionInstance.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionService.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/pom.xml
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointManagerImpl.xtend [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/ProviderContextImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/DataReaderRouter.xtend [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/NotificationRouterImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/RpcRouterImpl.xtend [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/NotificationRouter.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java [new file with mode: 0644]
opendaylight/md-sal/sal-netconf-connector/pom.xml

index c1dccdf..cfbd4f7 100644 (file)
@@ -38,6 +38,7 @@
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.core</artifactId>
             <version>${osgi.core.version}</version>
+            <scope>provided</scope>
         </dependency>
     </dependencies>
 </project>
index b8b37af..9ca025b 100644 (file)
                 <configuration>
                     <instructions>
                         <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+                        <Export-package>
+                            org.opendaylight.controller.sal.binding.spi.*,
+                        </Export-package>
                         <Private-Package>
                             org.opendaylight.controller.config.yang.md.sal.binding.impl,
-                            org.opendaylight.controller.sal.binding.spi,
-                            org.opendaylight.controller.sal.binding.spi.*,
                             org.opendaylight.controller.sal.binding.impl,
                             org.opendaylight.controller.sal.binding.impl.*,
+                            org.opendaylight.controller.sal.binding.codegen,
                             org.opendaylight.controller.sal.binding.codegen.*,
                         </Private-Package>
                     </instructions>
index 9356ecd..6ed63b2 100644 (file)
@@ -5,23 +5,13 @@ import org.opendaylight.controller.sal.binding.api.data.DataChangeListener
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
 import org.opendaylight.yangtools.yang.binding.DataObject
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction.DataTransactionListener
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
 import org.opendaylight.controller.md.sal.common.api.data.DataReader
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
 import org.opendaylight.yangtools.concepts.ListenerRegistration
-import static extension org.opendaylight.controller.sal.binding.impl.util.MapUtils.*;
-import java.util.Collection
-import java.util.Map.Entry
-import java.util.HashSet
-import java.util.Set
 import com.google.common.collect.Multimap
 import static com.google.common.base.Preconditions.*;
 import java.util.List
-import java.util.LinkedList
-import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider
 import com.google.common.collect.HashMultimap
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Callable
@@ -30,15 +20,17 @@ import org.opendaylight.controller.sal.common.util.Rpcs
 import java.util.Collections
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
 import java.util.ArrayList
-import org.opendaylight.controller.sal.common.util.RpcErrors
+import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
+import java.util.Arrays
 
 class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderService {
 
     @Property
     var ExecutorService executor;
 
-    Multimap<InstanceIdentifier, DataReaderRegistration> configReaders = HashMultimap.create();
-    Multimap<InstanceIdentifier, DataReaderRegistration> operationalReaders = HashMultimap.create();
+    val dataReadRouter = new BindingAwareDataReaderRouter;
+
     Multimap<InstanceIdentifier, DataChangeListenerRegistration> listeners = HashMultimap.create();
     Multimap<InstanceIdentifier, DataCommitHandlerRegistration> commitHandlers = HashMultimap.create();
 
@@ -47,13 +39,11 @@ class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderSer
     }
 
     override readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
-        val readers = configReaders.getAllChildren(path);
-        return readers.readConfiguration(path);
+        return dataReadRouter.readConfigurationData(path);
     }
 
     override readOperationalData(InstanceIdentifier<? extends DataObject> path) {
-        val readers = operationalReaders.getAllChildren(path);
-        return readers.readOperational(path);
+        return dataReadRouter.readOperationalData(path);
     }
 
     override registerCommitHandler(InstanceIdentifier<? extends DataObject> path,
@@ -69,20 +59,12 @@ class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderSer
         return reg;
     }
 
-    override registerDataReader(InstanceIdentifier<? extends DataObject> path,
-        DataReader<InstanceIdentifier<? extends DataObject>, DataObject> provider) {
-        val ret = new DataReaderRegistration(provider, this);
-        ret.paths.add(path);
-        configReaders.put(path, ret);
-        operationalReaders.put(path, ret);
-        return ret;
-    }
-
-    protected def removeReader(DataReaderRegistration reader) {
-        for (path : reader.paths) {
-            operationalReaders.remove(path, reader);
-            configReaders.remove(path, reader);
-        }
+    override registerDataReader(InstanceIdentifier<? extends DataObject> path,DataReader<InstanceIdentifier<? extends DataObject>,DataObject> reader) {
+        
+        val confReg = dataReadRouter.registerConfigurationReader(path,reader);
+        val dataReg = dataReadRouter.registerOperationalReader(path,reader);
+        
+        return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg));
     }
 
     protected def removeListener(DataChangeListenerRegistration registration) {
@@ -92,39 +74,8 @@ class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderSer
     protected def removeCommitHandler(DataCommitHandlerRegistration registration) {
         commitHandlers.remove(registration.path, registration);
     }
-
-    protected def DataObject readConfiguration(
-        Collection<Entry<? extends InstanceIdentifier, ? extends DataReaderRegistration>> entries,
-        InstanceIdentifier<? extends DataObject> path) {
-
-        val List<DataObject> partialResults = new LinkedList();
-        for (entry : entries) {
-            partialResults.add(entry.value.instance.readConfigurationData(path))
-        }
-        return merge(path, partialResults);
-    }
-
-    protected def DataObject readOperational(
-        Collection<Entry<? extends InstanceIdentifier, ? extends DataReaderRegistration>> entries,
-        InstanceIdentifier<? extends DataObject> path) {
-
-        val List<DataObject> partialResults = new LinkedList();
-        for (entry : entries) {
-            partialResults.add(entry.value.instance.readOperationalData(path))
-        }
-        return merge(path, partialResults);
-    }
-
-    protected def DataObject merge(InstanceIdentifier<? extends DataObject> identifier, List<DataObject> objects) {
-
-        // FIXME: implement real merge
-        if (objects.size > 0) {
-            return objects.get(0);
-        }
-    }
     
     protected def getActiveCommitHandlers() {
-        
         return commitHandlers.entries.map[ value.instance].toSet
     }
 
@@ -137,26 +88,6 @@ class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderSer
 
 }
 
-package class DataReaderRegistration extends //
-AbstractObjectRegistration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> {
-
-    DataBrokerImpl dataBroker;
-
-    @Property
-    val Set<InstanceIdentifier<? extends DataObject>> paths;
-
-    new(DataReader<InstanceIdentifier<? extends DataObject>, DataObject> instance, DataBrokerImpl broker) {
-        super(instance)
-        dataBroker = broker;
-        _paths = new HashSet();
-    }
-
-    override protected removeRegistration() {
-        dataBroker.removeReader(this);
-    }
-
-}
-
 package class DataChangeListenerRegistration extends AbstractObjectRegistration<DataChangeListener> implements ListenerRegistration<DataChangeListener> {
 
     DataBrokerImpl dataBroker;
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataProviderContext.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataProviderContext.xtend
deleted file mode 100644 (file)
index 398a219..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.binding.impl
-
-import org.opendaylight.controller.sal.common.DataStoreIdentifier
-import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider
-
-class DataProviderContext {
-
-    @Property
-    var DataStoreIdentifier identifier;
-    @Property
-    var RuntimeDataProvider provider;
-}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/util/BindingAwareDataReaderRouter.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/util/BindingAwareDataReaderRouter.xtend
new file mode 100644 (file)
index 0000000..f586a8b
--- /dev/null
@@ -0,0 +1,13 @@
+package org.opendaylight.controller.sal.binding.impl.util
+
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
+import org.opendaylight.yangtools.yang.binding.DataObject
+
+class BindingAwareDataReaderRouter extends AbstractDataReadRouter<InstanceIdentifier<? extends DataObject>, DataObject> {
+    
+    override protected merge(InstanceIdentifier<? extends DataObject> path, Iterable<DataObject> data) {
+        return data.iterator.next;
+    }
+    
+}
\ No newline at end of file
index 0fea50b..5556525 100644 (file)
@@ -13,19 +13,102 @@ import java.util.Set;
 // FIXME: After 0.6 Release of YANGTools refactor to use Path marker interface for arguments.
 // import org.opendaylight.yangtools.concepts.Path;
 
+public interface DataChange<P/* extends Path<P> */, D> {
 
-public interface DataChange<P/* extends Path<P> */,D> {
+    /**
+     * Returns a map of paths and newly created objects
+     * 
+     * @return map of paths and newly created objects
+     */
+    Map<P, D> getCreatedOperationalData();
 
-    Map<P,D> getCreatedOperationalData();
+    /**
+     * Returns a map of paths and newly created objects
+     * 
+     * @return map of paths and newly created objects
+     */
+    Map<P, D> getCreatedConfigurationData();
 
-    Map<P,D> getUpdatedOperationalData();
+    /**
+     * Returns a map of paths and respective updated objects after update.
+     * 
+     * Original state of the object is in
+     * {@link #getOriginalOperationalData()}
+     * 
+     * @return map of paths and newly created objects
+     */
+    Map<P, D> getUpdatedOperationalData();
 
+    /**
+     * Returns a map of paths and respective updated objects after update.
+     * 
+     * Original state of the object is in
+     * {@link #getOriginalConfigurationData()}
+     * 
+     * @return map of paths and newly created objects
+     */
+    Map<P, D> getUpdatedConfigurationData();
+
+    /**
+     * Returns a set of paths of removed objects.
+     * 
+     * Original state of the object is in
+     * {@link #getOriginalConfigurationData()}
+     * 
+     * @return map of paths and newly created objects
+     */
+    Set<P> getRemovedConfigurationData();
+
+    /**
+     * Returns a set of paths of removed objects.
+     * 
+     * Original state of the object is in
+     * {@link #getOriginalOperationalData()}
+     * 
+     * @return map of paths and newly created objects
+     */
     Set<P> getRemovedOperationalData();
 
-    Map<P,D> getCreatedConfigurationData();
+    /**
+     * Return a map of paths and original state of updated and removed objectd.
+     * 
+     * @return map of paths and original state of updated and removed objectd.
+     */
+    Map<P, D> getOriginalConfigurationData();
 
-    Map<P,D> getUpdatedConfigurationData();
+    /**
+     * Return a map of paths and original state of updated and removed objectd.
+     * 
+     * @return map of paths and original state of updated and removed objectd.
+     */
+    Map<P, D> getOriginalOperationalData();
 
-    Set<P> getRemovedConfigurationData();
+    /**
+     * Returns a original subtree of data, which starts at the path
+     * where listener was registered.
+     * 
+     */
+    D getOriginalConfigurationSubtree();
+
+    /**
+     * Returns a original subtree of data, which starts at the path
+     * where listener was registered.
+     * 
+     */
+    D getOriginalOperationalSubtree();
+
+    /**
+     * Returns a new subtree of data, which starts at the path
+     * where listener was registered.
+     * 
+     */
+    D getUpdatedConfigurationSubtree();
+
+    /**
+     * Returns a new subtree of data, which starts at the path
+     * where listener was registered.
+     * 
+     */
+    D getUpdatedOperationalSubtree();
 
 }
index 85e3d8f..90de13d 100644 (file)
@@ -84,7 +84,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
  */
 public interface DataCommitHandler<P/* extends Path<P> */,D> {
 
-
+    
     DataCommitTransaction<P, D> requestCommit(DataModification<P,D> modification);
 
     public interface DataCommitTransaction<P/* extends Path<P> */,D> {
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStore.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStore.java
new file mode 100644 (file)
index 0000000..f448d4e
--- /dev/null
@@ -0,0 +1,16 @@
+package org.opendaylight.controller.md.sal.common.api.data;
+
+public interface DataStore<P, D> extends //
+        DataReader<P, D>, //
+        DataModificationTransactionFactory<P, D> {
+
+    @Override
+    public DataModification<P, D> beginTransaction();
+
+    @Override
+    public D readConfigurationData(P path);
+
+    @Override
+    public D readOperationalData(P path);
+
+}
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Route.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Route.java
new file mode 100644 (file)
index 0000000..afe9e99
--- /dev/null
@@ -0,0 +1,10 @@
+package org.opendaylight.controller.md.sal.common.api.routing;
+
+import org.opendaylight.yangtools.concepts.Immutable;
+
+public interface Route<C,P> extends Immutable {
+
+    C getType();
+    
+    P getPath();
+}
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangeListener.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangeListener.java
new file mode 100644 (file)
index 0000000..994f65b
--- /dev/null
@@ -0,0 +1,8 @@
+package org.opendaylight.controller.md.sal.common.api.routing;
+
+import java.util.EventListener;
+
+public interface RouteChangeListener<C,P> extends EventListener {
+
+    void onRouteChange(RouteChange<C, P> change);
+}
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangePublisher.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangePublisher.java
new file mode 100644 (file)
index 0000000..89851c9
--- /dev/null
@@ -0,0 +1,8 @@
+package org.opendaylight.controller.md.sal.common.api.routing;
+
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+public interface RouteChangePublisher<C,P> {
+
+    ListenerRegistration<RouteChangeListener<C,P>> registerRouteChangeListener(RouteChangeListener<C,P> listener);
+}
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Router.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Router.java
new file mode 100644 (file)
index 0000000..8d0a90c
--- /dev/null
@@ -0,0 +1,10 @@
+package org.opendaylight.controller.md.sal.common.api.routing;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface Router<C,P,D> extends //
+        RouteChangePublisher<C, P> {
+
+    Map<C, Set<P>> getAnnouncedPaths();
+}
index 0c2344a..3bd51ec 100644 (file)
         <artifactId>maven-bundle-plugin</artifactId>
         <configuration>
           <instructions>
-            <Export-Package>org.opendaylight.controller.md.sal.common.impl</Export-Package>
+            <Export-Package>
+                org.opendaylight.controller.md.sal.common.impl,
+                org.opendaylight.controller.md.sal.common.impl.*
+            </Export-Package>
           </instructions>
         </configuration>
       </plugin>
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/AbstractDataReadRouter.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/AbstractDataReadRouter.java
new file mode 100644 (file)
index 0000000..f83c61f
--- /dev/null
@@ -0,0 +1,187 @@
+package org.opendaylight.controller.md.sal.common.impl.routing;
+
+import java.util.Map.Entry;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.concepts.Registration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Base abstract implementation of DataReadRouter, which performs
+ * a read operation on multiple data readers and then merges result.
+ * 
+ * @param <P>
+ * @param <D>
+ */
+public abstract class AbstractDataReadRouter<P extends Path<?>, D> implements DataReader<P, D> {
+
+    Multimap<P, DataReaderRegistration<P, D>> configReaders = HashMultimap.create();
+    Multimap<P, DataReaderRegistration<P, D>> operationalReaders = HashMultimap.create();
+
+    @Override
+    public D readConfigurationData(P path) {
+        FluentIterable<D> dataBits = FluentIterable //
+                .from(getReaders(configReaders, path)).transform(configurationRead(path));
+        return merge(path,dataBits);
+    }
+
+    @Override
+    public D readOperationalData(P path) {
+        FluentIterable<D> dataBits = FluentIterable //
+                .from(getReaders(configReaders, path)).transform(operationalRead(path));
+        return merge(path,dataBits);
+
+    }
+
+    /**
+     * Merges data readed by reader instances from specified path
+     * 
+     * @param path Path on which read was performed
+     * @param data Data which was returned by read operation.
+     * @return Merged result.
+     */
+    protected abstract D merge(P path,Iterable<D> data);
+
+    /**
+     * Returns a function which performs configuration read for supplied path
+     * 
+     * @param path
+     * @return function which performs configuration read for supplied path
+     */
+    
+    private Function<DataReader<P, D>, D> configurationRead(final P path) {
+        return new Function<DataReader<P, D>, D>() {
+            @Override
+            public D apply(DataReader<P, D> input) {
+                return input.readConfigurationData(path);
+            }
+        };
+    }
+
+    /**
+     * Returns a function which performs operational read for supplied path
+     * 
+     * @param path
+     * @return function which performs operational read for supplied path
+     */
+    private Function<DataReader<P, D>, D> operationalRead(final P path) {
+        return new Function<DataReader<P, D>, D>() {
+            @Override
+            public D apply(DataReader<P, D> input) {
+                return input.readConfigurationData(path);
+            }
+        };
+    }
+
+    // Registrations
+
+    /**
+     * Register's a reader for operational data.
+     * 
+     * @param path Path which is served by this reader
+     * @param reader Reader instance which is responsible for reading particular subpath.
+     * @return 
+     */
+    public Registration<DataReader<P, D>> registerOperationalReader(P path, DataReader<P, D> reader) {
+        OperationalDataReaderRegistration<P, D> ret = new OperationalDataReaderRegistration<>(path, reader);
+        operationalReaders.put(path, ret);
+        return ret;
+    }
+
+    public Registration<DataReader<P, D>> registerConfigurationReader(P path, DataReader<P, D> reader) {
+        ConfigurationDataReaderRegistration<P, D> ret = new ConfigurationDataReaderRegistration<>(path, reader);
+        configReaders.put(path, ret);
+        return ret;
+    }
+
+    Iterable<DataReader<P, D>> getOperationalReaders(P path) {
+        return getReaders(operationalReaders, path);
+    }
+
+    Iterable<DataReader<P, D>> getConfigurationReaders(P path) {
+        return getReaders(configReaders, path);
+    }
+
+    private Iterable<DataReader<P, D>> getReaders(Multimap<P, DataReaderRegistration<P, D>> readerMap, P path) {
+        return FluentIterable
+            .from(readerMap.entries()) //
+            .filter(affects(path)) //
+            .transform(retrieveInstance());
+    }
+
+    private void removeRegistration(OperationalDataReaderRegistration<?, ?> registration) {
+        operationalReaders.remove(registration.getKey(), registration);
+    }
+
+    private void removeRegistration(ConfigurationDataReaderRegistration<?, ?> registration) {
+        configReaders.remove(registration.getKey(), registration);
+    }
+
+    private Function<? super Entry<P, DataReaderRegistration<P, D>>, DataReader<P, D>> retrieveInstance() {
+        return new Function<Entry<P, DataReaderRegistration<P, D>>, DataReader<P,D>>() {
+            @Override
+            public DataReader<P, D> apply(Entry<P, DataReaderRegistration<P, D>> input) {
+                return input.getValue().getInstance();
+            }
+        };
+    }
+
+    private Predicate<? super Entry<P, DataReaderRegistration<P, D>>> affects(final P path) {
+        
+        return new Predicate<Entry<P, DataReaderRegistration<P, D>>>() {
+            
+            @Override
+            public boolean apply(Entry<P, DataReaderRegistration<P, D>> input) {
+                final Path key = input.getKey();
+                return key.contains(path) || ((Path) path).contains(key);
+            }
+            
+        };
+    }
+
+    private class ConfigurationDataReaderRegistration<P extends Path<?>, D> extends DataReaderRegistration<P, D> {
+
+        public ConfigurationDataReaderRegistration(P key, DataReader<P, D> instance) {
+            super(key, instance);
+        }
+
+        @Override
+        protected void removeRegistration() {
+            AbstractDataReadRouter.this.removeRegistration(this);
+        }
+    }
+
+    private class OperationalDataReaderRegistration<P extends Path<?>, D> extends DataReaderRegistration<P, D> {
+
+        public OperationalDataReaderRegistration(P key, DataReader<P, D> instance) {
+            super(key, instance);
+        }
+
+        @Override
+        protected void removeRegistration() {
+            AbstractDataReadRouter.this.removeRegistration(this);
+        }
+    }
+
+    private abstract static class DataReaderRegistration<P extends Path<?>, D> extends
+            AbstractObjectRegistration<DataReader<P, D>> {
+
+        private final P key;
+
+        public P getKey() {
+            return this.key;
+        }
+
+        public DataReaderRegistration(P key, DataReader<P, D> instance) {
+            super(instance);
+            this.key = key;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcConsumptionRegistry.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcConsumptionRegistry.java
new file mode 100644 (file)
index 0000000..c19ee1a
--- /dev/null
@@ -0,0 +1,22 @@
+package org.opendaylight.controller.sal.core.api;
+
+import java.util.concurrent.Future;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+public interface RpcConsumptionRegistry {
+    /**
+     * Sends an RPC to other components registered to the broker.
+     * 
+     * @see RpcImplementation
+     * @param rpc
+     *            Name of RPC
+     * @param input
+     *            Input data to the RPC
+     * @return Result of the RPC call
+     */
+    Future<RpcResult<CompositeNode>> rpc(QName rpc, CompositeNode input);
+
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcProvisionRegistry.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcProvisionRegistry.java
new file mode 100644 (file)
index 0000000..c326bab
--- /dev/null
@@ -0,0 +1,33 @@
+package org.opendaylight.controller.sal.core.api;
+
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+
+public interface RpcProvisionRegistry {
+
+    /**
+     * Registers an implementation of the rpc.
+     * 
+     * <p>
+     * The registered rpc functionality will be available to all other
+     * consumers and providers registered to the broker, which are aware of
+     * the {@link QName} assigned to the rpc.
+     * 
+     * <p>
+     * There is no assumption that rpc type is in the set returned by
+     * invoking {@link RpcImplementation#getSupportedRpcs()}. This allows
+     * for dynamic rpc implementations.
+     * 
+     * @param rpcType
+     *            Name of Rpc
+     * @param implementation
+     *            Provider's Implementation of the RPC functionality
+     * @throws IllegalArgumentException
+     *             If the name of RPC is invalid
+     */
+    RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
+            throws IllegalArgumentException;
+
+    RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+}
index 3024c89..20fa29d 100644 (file)
@@ -10,8 +10,10 @@ package org.opendaylight.controller.sal.core.api.data;
 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
 import org.opendaylight.controller.sal.common.DataStoreIdentifier;
 import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;;
 
 public interface DataProviderService extends 
     DataBrokerService, //
@@ -54,6 +56,11 @@ public interface DataProviderService extends
      */
     void removeRefresher(DataStoreIdentifier store, DataRefresher refresher);
 
+    
+    Registration<DataReader<InstanceIdentifier, CompositeNode>> registerConfigurationReader(InstanceIdentifier path, DataReader<InstanceIdentifier, CompositeNode> reader);
+
+    Registration<DataReader<InstanceIdentifier, CompositeNode>> registerOperationalReader(InstanceIdentifier path, DataReader<InstanceIdentifier, CompositeNode> reader);
+    
     public interface DataRefresher extends Provider.ProviderFunctionality {
 
         /**
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionInstance.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionInstance.java
new file mode 100644 (file)
index 0000000..8f6a5d0
--- /dev/null
@@ -0,0 +1,8 @@
+package org.opendaylight.controller.sal.core.api.mount;
+
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
+
+public interface MountProvisionInstance extends MountInstance, NotificationPublishService, RpcProvisionRegistry {
+
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionService.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionService.java
new file mode 100644 (file)
index 0000000..fade7d3
--- /dev/null
@@ -0,0 +1,13 @@
+package org.opendaylight.controller.sal.core.api.mount;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public interface MountProvisionService extends MountService {
+
+    @Override
+    public MountProvisionInstance getMountPoint(InstanceIdentifier path);
+    
+    MountProvisionInstance createMountPoint(InstanceIdentifier path);
+    
+    MountProvisionInstance createOrGetMountPoint(InstanceIdentifier path);
+}
index 678728a..9383a9e 100644 (file)
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-common-util</artifactId>\r
             <version>1.0-SNAPSHOT</version>\r
+        </dependency>\r
+                <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-common-impl</artifactId>\r
+            <version>1.0-SNAPSHOT</version>\r
         </dependency>\r
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
@@ -60,7 +65,7 @@
                         <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>\r
                         <Bundle-Activator>org.opendaylight.controller.sal.dom.broker.BrokerActivator</Bundle-Activator>\r
                         <Private-Package>\r
-                            org.opendaylight.controller.sal.dom.broker,\r
+                            org.opendaylight.controller.sal.dom.broker.*\r
                         </Private-Package>\r
                     </instructions>\r
                 </configuration>\r
index 83dda59..855ad9b 100644 (file)
@@ -7,29 +7,28 @@
  */
 package org.opendaylight.controller.sal.dom.broker;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.BrokerService;
-import org.opendaylight.controller.sal.core.api.Consumer;
-import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.spi.BrokerModule;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.osgi.framework.BundleContext;
-import org.slf4j.LoggerFactory;
+import java.util.Collections
+import java.util.HashMap
+import java.util.HashSet
+import java.util.Map
+import java.util.Set
+import java.util.concurrent.Callable
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.Future
+import org.opendaylight.controller.sal.core.api.Broker
+import org.opendaylight.controller.sal.core.api.BrokerService
+import org.opendaylight.controller.sal.core.api.Consumer
+import org.opendaylight.controller.sal.core.api.Provider
+import org.opendaylight.controller.sal.core.spi.BrokerModule
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.yangtools.yang.common.RpcResult
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.osgi.framework.BundleContext
+import org.slf4j.LoggerFactory
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
 import org.opendaylight.yangtools.concepts.ListenerRegistration
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
-import org.opendaylight.controller.md.sal.common.impl.ListenerRegistry
 
 public class BrokerImpl implements Broker {
     private static val log = LoggerFactory.getLogger(BrokerImpl);
@@ -42,17 +41,14 @@ public class BrokerImpl implements Broker {
     private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
         synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
 
-
-    private val rpcRegistrationListeners = new ListenerRegistry<RpcRegistrationListener>();
-    // RPC Context
-    private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
-        new HashMap<QName, RpcImplementation>());
-
     // Implementation specific
     @Property
     private var ExecutorService executor = Executors.newFixedThreadPool(5);
     @Property
     private var BundleContext bundleContext;
+    
+    @Property
+    private var RpcRouter router;
 
     override registerConsumer(Consumer consumer, BundleContext ctx) {
         checkPredicates(consumer);
@@ -95,42 +91,8 @@ public class BrokerImpl implements Broker {
         return prov.getServiceForSession(service, session);
     }
 
-    // RPC Functionality
-    protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) {
-        if(rpcImpls.get(rpcType) != null) {
-            throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
-        }
-
-        
-        rpcImpls.put(rpcType, implementation);
-
-        
-        for(listener : rpcRegistrationListeners.listeners)  {
-            try {
-                listener.instance.onRpcImplementationAdded(rpcType);
-            } catch (Exception e){
-                log.error("Unhandled exception during invoking listener",e);
-            }
-        }
-    }
-
-    protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) {
-        if(implToRemove == rpcImpls.get(rpcType)) {
-            rpcImpls.remove(rpcType);
-        }
-        
-        for(listener : rpcRegistrationListeners.listeners)  {
-            try {
-                listener.instance.onRpcImplementationRemoved(rpcType);
-            } catch (Exception e){
-                log.error("Unhandled exception during invoking listener",e);
-            }
-        }
-    }
-
     protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
-        val impl = rpcImpls.get(rpc);
-        val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
+        val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
         return result;
     }
 
@@ -171,12 +133,4 @@ public class BrokerImpl implements Broker {
         sessions.remove(consumerContextImpl);
         providerSessions.remove(consumerContextImpl);
     }
-    
-    protected def getSupportedRpcs() {
-        rpcImpls.keySet;
-    }
-    
-    def ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
-        rpcRegistrationListeners.register(listener);
-    }
 }
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java
new file mode 100644 (file)
index 0000000..7037b46
--- /dev/null
@@ -0,0 +1,118 @@
+package org.opendaylight.controller.sal.dom.broker;
+
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.sal.dom.broker.impl.DataReaderRouter;
+import org.opendaylight.controller.sal.dom.broker.impl.NotificationRouterImpl;
+import org.opendaylight.controller.sal.dom.broker.impl.RpcRouterImpl;
+import org.opendaylight.controller.sal.dom.broker.spi.NotificationRouter;
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public class MountPointImpl implements MountProvisionInstance {
+
+    final RpcRouter rpcs;
+    final DataReaderRouter dataReader;
+    final NotificationRouter notificationRouter;
+
+    public MountPointImpl(InstanceIdentifier path) {
+        rpcs = new RpcRouterImpl("");
+        dataReader = new DataReaderRouter();
+        notificationRouter = new NotificationRouterImpl();
+    }
+
+    @Override
+    public void publish(CompositeNode notification) {
+        notificationRouter.publish(notification);
+    }
+
+    @Override
+    public Registration<NotificationListener> addNotificationListener(QName notification, NotificationListener listener) {
+        return notificationRouter.addNotificationListener(notification, listener);
+    }
+
+    @Override
+    public CompositeNode readConfigurationData(InstanceIdentifier path) {
+        return dataReader.readConfigurationData(path);
+    }
+
+    @Override
+    public CompositeNode readOperationalData(InstanceIdentifier path) {
+        return dataReader.readOperationalData(path);
+    }
+
+    public Registration<DataReader<InstanceIdentifier, CompositeNode>> registerOperationalReader(
+            InstanceIdentifier path, DataReader<InstanceIdentifier, CompositeNode> reader) {
+        return dataReader.registerOperationalReader(path, reader);
+    }
+
+    public Registration<DataReader<InstanceIdentifier, CompositeNode>> registerConfigurationReader(
+            InstanceIdentifier path, DataReader<InstanceIdentifier, CompositeNode> reader) {
+        return dataReader.registerConfigurationReader(path, reader);
+    }
+
+    @Override
+    public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
+        return rpcs.addRoutedRpcImplementation(rpcType, implementation);
+    }
+
+    @Override
+    public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
+            throws IllegalArgumentException {
+        return rpcs.addRpcImplementation(rpcType, implementation);
+    }
+
+    public Set<QName> getSupportedRpcs() {
+        return rpcs.getSupportedRpcs();
+    }
+
+    
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+        return rpcs.invokeRpc(rpc, input);
+    }
+
+    public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
+        return rpcs.addRpcRegistrationListener(listener);
+    }
+
+
+    @Override
+    public Future<RpcResult<CompositeNode>> rpc(QName type, CompositeNode input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public DataModificationTransaction beginTransaction() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ListenerRegistration<DataChangeListener> registerDataChangeListener(InstanceIdentifier path,
+            DataChangeListener listener) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void sendNotification(CompositeNode notification) {
+        publish(notification);
+        
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointManagerImpl.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointManagerImpl.xtend
new file mode 100644 (file)
index 0000000..c64d1e5
--- /dev/null
@@ -0,0 +1,35 @@
+package org.opendaylight.controller.sal.dom.broker
+
+
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.ConcurrentHashMap
+import static com.google.common.base.Preconditions.*;
+
+class MountPointManagerImpl implements MountProvisionService {
+    
+    ConcurrentMap<InstanceIdentifier,MountPointImpl> mounts = new ConcurrentHashMap();
+    
+    override createMountPoint(InstanceIdentifier path) {
+        checkState(!mounts.containsKey(path),"Mount already created");
+        val mount = new MountPointImpl(path);
+        mounts.put(path,mount);
+    }
+    
+    
+    override createOrGetMountPoint(InstanceIdentifier path) {
+        val mount = mounts.get(path);
+        if(mount === null) {
+            return createMountPoint(path)
+        }
+        return mount;
+    }
+    
+    
+    override getMountPoint(InstanceIdentifier path) {
+        mounts.get(path);
+    }
+    
+    
+}
index bffc570..cf5d220 100644 (file)
@@ -1,25 +1,23 @@
 package org.opendaylight.controller.sal.dom.broker
 
-import java.util.Collections
-import java.util.HashMap
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
 import org.opendaylight.controller.sal.core.api.Provider
 import org.opendaylight.controller.sal.core.api.RpcImplementation
 import org.opendaylight.yangtools.yang.common.QName
 import org.osgi.framework.BundleContext
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
-import static java.util.Collections.*
-import java.util.Collections
-import java.util.HashMap
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.opendaylight.yangtools.concepts.Registration
+
+import java.util.Set
+import java.util.HashSet
 
 class ProviderContextImpl extends ConsumerContextImpl implements ProviderSession {
 
     @Property
     private val Provider provider;
 
-    private val rpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
+    private val Set<Registration<?>> registrations = new HashSet();
 
     new(Provider provider, BundleContext ctx) {
         super(null, ctx);
@@ -27,39 +25,22 @@ class ProviderContextImpl extends ConsumerContextImpl implements ProviderSession
     }
 
     override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
-        if (rpcType == null) {
-            throw new IllegalArgumentException("rpcType must not be null");
-        }
-        if (implementation == null) {
-            throw new IllegalArgumentException("Implementation must not be null");
-        }
-        broker.addRpcImplementation(rpcType, implementation);
-        rpcImpls.put(rpcType, implementation);
-
-        return new RpcRegistrationImpl(rpcType, implementation, this);
+        val origReg = broker.router.addRpcImplementation(rpcType, implementation);
+        val newReg = new RpcRegistrationWrapper(origReg);
+        registrations.add(newReg);
+        return newReg;
     }
 
-    def removeRpcImplementation(RpcRegistrationImpl implToRemove) throws IllegalArgumentException {
-        val localImpl = rpcImpls.get(implToRemove.type);
-        if (localImpl !== implToRemove.instance) {
-            throw new IllegalStateException("Implementation was not registered in this session");
-        }
-        broker.removeRpcImplementation(implToRemove.type, localImpl);
-        rpcImpls.remove(implToRemove.type);
+    protected def removeRpcImplementation(RpcRegistrationWrapper implToRemove) throws IllegalArgumentException {
+        registrations.remove(implToRemove);
     }
-
+    
     override close() {
-        removeAllRpcImlementations
-        super.close
-    }
-
-    private def removeAllRpcImlementations() {
-        if (!rpcImpls.empty) {
-            for (entry : rpcImpls.entrySet) {
-                broker.removeRpcImplementation(entry.key, entry.value);
-            }
-            rpcImpls.clear
+        
+        for (reg : registrations) {
+            reg.close()
         }
+        super.close
     }
 
     override addMountedRpcImplementation(QName rpcType, RpcImplementation implementation) {
@@ -71,30 +52,34 @@ class ProviderContextImpl extends ConsumerContextImpl implements ProviderSession
     }
 
     override getSupportedRpcs() {
-        broker.getSupportedRpcs();
+        broker.router.supportedRpcs;
     }
 
     override addRpcRegistrationListener(RpcRegistrationListener listener) {
-        broker.addRpcRegistrationListener(listener);
+        broker.router.addRpcRegistrationListener(listener);
     }
 }
 
-class RpcRegistrationImpl extends AbstractObjectRegistration<RpcImplementation> implements RpcRegistration {
+class RpcRegistrationWrapper implements RpcRegistration {
+
 
     @Property
-    val QName type
+    val RpcRegistration delegate
 
-    private var ProviderContextImpl context
+    new(RpcRegistration delegate) {
+        _delegate = delegate
+    }
 
-    new(QName type, RpcImplementation instance, ProviderContextImpl ctx) {
-        super(instance)
-        _type = type
-        context = ctx
+    override getInstance() {
+        delegate.instance
     }
 
-    override protected removeRegistration() {
-        context.removeRpcImplementation(this)
-        context = null
+    override close() {
+        delegate.close
     }
 
+    override getType() {
+        delegate.type
+    }
 }
+
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/DataReaderRouter.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/DataReaderRouter.xtend
new file mode 100644 (file)
index 0000000..1e0f338
--- /dev/null
@@ -0,0 +1,13 @@
+package org.opendaylight.controller.sal.dom.broker.impl
+
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+
+class DataReaderRouter extends AbstractDataReadRouter<InstanceIdentifier, CompositeNode> {
+
+    override protected merge(InstanceIdentifier path, Iterable<CompositeNode> data) {
+        return data.iterator.next
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/NotificationRouterImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/NotificationRouterImpl.java
new file mode 100644 (file)
index 0000000..6d7b600
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.dom.broker.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.opendaylight.controller.sal.core.api.BrokerService;
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;
+import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
+import org.opendaylight.controller.sal.core.api.notify.NotificationService;
+import org.opendaylight.controller.sal.core.spi.BrokerModule;
+import org.opendaylight.controller.sal.dom.broker.spi.NotificationRouter;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+public class NotificationRouterImpl implements NotificationRouter {
+    private static Logger log = LoggerFactory.getLogger(NotificationRouterImpl.class);
+
+    private Multimap<QName, Registration<NotificationListener>> listeners = HashMultimap.create();
+
+    private void sendNotification(CompositeNode notification) {
+        QName type = notification.getNodeType();
+        Collection<Registration<NotificationListener>> toNotify = listeners.get(type);
+        log.info("Publishing notification " + type);
+
+        if (toNotify == null) {
+            // No listeners were registered - returns.
+            return;
+        }
+
+        for (Registration<NotificationListener> listener : toNotify) {
+            try {
+                // FIXME: ensure that notification is immutable
+                listener.getInstance().onNotification(notification);
+            } catch (Exception e) {
+                log.error("Uncaught exception in NotificationListener", e);
+            }
+        }
+
+    }
+
+    @Override
+    public void publish(CompositeNode notification) {
+        sendNotification(notification);
+    }
+
+    @Override
+    public Registration<NotificationListener> addNotificationListener(QName notification, NotificationListener listener) {
+        ListenerRegistration ret = new ListenerRegistration(notification, listener);
+        return ret;
+    }
+
+    private class ListenerRegistration extends AbstractObjectRegistration<NotificationListener> {
+
+        final QName type;
+
+        public ListenerRegistration(QName type, NotificationListener instance) {
+            super(instance);
+            this.type = type;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            listeners.remove(type, this);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/RpcRouterImpl.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/RpcRouterImpl.xtend
new file mode 100644 (file)
index 0000000..d67697f
--- /dev/null
@@ -0,0 +1,106 @@
+package org.opendaylight.controller.sal.dom.broker.impl
+
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
+import org.opendaylight.yangtools.concepts.Identifiable
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.controller.sal.core.api.RpcImplementation
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import static com.google.common.base.Preconditions.*;
+import java.util.Map
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
+import java.util.concurrent.ConcurrentHashMap
+import java.util.Set
+import java.util.Collections
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
+import org.opendaylight.controller.md.sal.common.impl.ListenerRegistry
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.slf4j.LoggerFactory
+
+class RpcRouterImpl implements RpcRouter, Identifiable<String> {
+
+    static val log = LoggerFactory.getLogger(RpcRouterImpl)
+
+    Map<QName, RpcRegistration> implementations = new ConcurrentHashMap();
+
+    @Property
+    val Set<QName> supportedRpcs = Collections.unmodifiableSet(implementations.keySet);
+
+    private val rpcRegistrationListeners = new ListenerRegistry<RpcRegistrationListener>();
+
+    @Property
+    val String identifier;
+
+    new(String name) {
+        _identifier = name;
+    }
+
+    override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
+    }
+
+    override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
+        checkNotNull(rpcType, "Rpc Type should not be null");
+        checkNotNull(implementation, "Implementation should not be null.");
+        checkState(!implementations.containsKey(rpcType), "Provider for supplied rpc is already registered.");
+        val reg = new RpcRegistrationImpl(rpcType, implementation, this);
+        implementations.put(rpcType, reg)
+
+        for (listener : rpcRegistrationListeners.listeners) {
+            try {
+                listener.instance.onRpcImplementationAdded(rpcType);
+            } catch (Exception e) {
+                log.error("Unhandled exception during invoking listener", e);
+            }
+        }
+
+        return reg;
+
+    }
+
+    override invokeRpc(QName rpc, CompositeNode input) {
+        checkNotNull(rpc, "Rpc Type should not be null");
+
+        val impl = implementations.get(rpc);
+        checkState(impl !== null, "Provider for supplied rpc is not registered.");
+
+        return impl.instance.invokeRpc(rpc, input);
+    }
+
+    def remove(RpcRegistrationImpl impl) {
+        val existing = implementations.get(impl.type);
+        if (existing == impl) {
+            implementations.remove(impl.type);
+        }
+        for (listener : rpcRegistrationListeners.listeners) {
+            try {
+                listener.instance.onRpcImplementationRemoved(impl.type);
+            } catch (Exception e) {
+                log.error("Unhandled exception during invoking listener", e);
+            }
+        }
+    }
+    
+    override addRpcRegistrationListener(RpcRegistrationListener listener) {
+        rpcRegistrationListeners.register(listener);
+    }
+
+}
+
+class RpcRegistrationImpl extends AbstractObjectRegistration<RpcImplementation> implements RpcRegistration {
+
+    @Property
+    val QName type;
+
+    @Property
+    var RpcRouterImpl router;
+
+    new(QName type, RpcImplementation instance, RpcRouterImpl router) {
+        super(instance)
+        _type = type
+        _router = router
+    }
+
+    override protected removeRegistration() {
+        router.remove(this);
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/NotificationRouter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/NotificationRouter.java
new file mode 100644 (file)
index 0000000..fc42d2c
--- /dev/null
@@ -0,0 +1,21 @@
+package org.opendaylight.controller.sal.dom.broker.spi;
+
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+public interface NotificationRouter {
+
+    void publish(CompositeNode notification);
+
+    /**
+     * Registers a notification listener for supplied notification type.
+     * 
+     * @param notification
+     * @param listener
+     */
+    Registration<NotificationListener> addNotificationListener(QName notification,
+            NotificationListener listener);
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java
new file mode 100644 (file)
index 0000000..97c2a15
--- /dev/null
@@ -0,0 +1,27 @@
+package org.opendaylight.controller.sal.dom.broker.spi;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public interface RoutedRpcProcessor extends RpcImplementation {
+
+    public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+
+    public Set<QName> getSupportedRpcs();
+
+    public QName getRpcType();
+    
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
+
+    Map<InstanceIdentifier,RpcImplementation> getRoutes();
+    
+    RpcImplementation getDefaultRoute();
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java
new file mode 100644 (file)
index 0000000..6886f89
--- /dev/null
@@ -0,0 +1,31 @@
+package org.opendaylight.controller.sal.dom.broker.spi;
+
+import java.util.Set;
+
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation {
+
+    @Override
+    public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+    
+    @Override
+    public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
+            throws IllegalArgumentException;
+    
+    @Override
+    public Set<QName> getSupportedRpcs();
+    
+    @Override
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
+
+    ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener);
+}
index c8bc715..57e4d85 100644 (file)
@@ -6,6 +6,9 @@
         <artifactId>sal-parent</artifactId>
         <version>1.0-SNAPSHOT</version>
     </parent>
+    <properties>
+        <netconf.version>0.2.2-SNAPSHOT</netconf.version>
+    </properties>
     <artifactId>sal-netconf-connector</artifactId>
     <scm>
         <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
             <groupId>${project.groupId}</groupId>
             <artifactId>sal-connector-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>sal-common-util</artifactId>
+            <version>1.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.xtend</groupId>
+            <artifactId>org.eclipse.xtend.lib</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>netconf-client</artifactId>
+            <version>0.2.2-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.yangtools</groupId>
+            <artifactId>yang-data-impl</artifactId>
+            <version>0.5.9-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>yang-test</artifactId>
+            <version>${netconf.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>config-api</artifactId>
+            <version>${netconf.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>config-util</artifactId>
+            <version>${netconf.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>yang-store-api</artifactId>
+            <version>${netconf.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>netconf-api</artifactId>
+            <version>${netconf.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.bgpcep</groupId>
+            <artifactId>util</artifactId>
+            <scope>test</scope>
+            <version>0.3.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>netconf-client</artifactId>
+            <scope>test</scope>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>config-netconf-connector</artifactId>
+            <scope>test</scope>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>yang-test</artifactId>
+            <scope>test</scope>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>config-manager</artifactId>
+            <scope>test</scope>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>config-persister-impl</artifactId>
+            <scope>test</scope>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>config-manager</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>netconf-impl</artifactId>
+            <scope>test</scope>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>netconf-mapping-api</artifactId>
+            <scope>test</scope>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>netconf-util</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>yang-store-impl</artifactId>
+            <scope>test</scope>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>yang-store-impl</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>logback-config</artifactId>
+            <scope>test</scope>
+            <version>${netconf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
     </dependencies>
 
     <packaging>bundle</packaging>