Merge "Unification of broker concepts implementations"
authorEd Warnicke <eaw@cisco.com>
Thu, 7 Nov 2013 14:06:23 +0000 (14:06 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 7 Nov 2013 14:06:23 +0000 (14:06 +0000)
30 files changed:
opendaylight/md-sal/sal-binding-api/pom.xml
opendaylight/md-sal/sal-binding-broker/pom.xml
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataProviderContext.xtend [deleted file]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/util/BindingAwareDataReaderRouter.xtend [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataChange.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataCommitHandler.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Route.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangePublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Router.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/pom.xml
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/AbstractDataReadRouter.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcConsumptionRegistry.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcProvisionRegistry.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/data/DataProviderService.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionInstance.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionService.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/pom.xml
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointManagerImpl.xtend [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/ProviderContextImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/DataReaderRouter.xtend [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/NotificationRouterImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/RpcRouterImpl.xtend [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/NotificationRouter.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java [new file with mode: 0644]

index c1dccdf5320a488a6b4c38a7464eb10e4d53af24..cfbd4f7b719e0f5217fbee4dba444b38ca0e9c7a 100644 (file)
@@ -38,6 +38,7 @@
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.core</artifactId>
             <version>${osgi.core.version}</version>
+            <scope>provided</scope>
         </dependency>
     </dependencies>
 </project>
index b8b37af58ee42eec8f1ca2b50763013f20c935f8..9ca025b393911d11ede993899a32e2488939c8a2 100644 (file)
                 <configuration>
                     <instructions>
                         <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+                        <Export-package>
+                            org.opendaylight.controller.sal.binding.spi.*,
+                        </Export-package>
                         <Private-Package>
                             org.opendaylight.controller.config.yang.md.sal.binding.impl,
-                            org.opendaylight.controller.sal.binding.spi,
-                            org.opendaylight.controller.sal.binding.spi.*,
                             org.opendaylight.controller.sal.binding.impl,
                             org.opendaylight.controller.sal.binding.impl.*,
+                            org.opendaylight.controller.sal.binding.codegen,
                             org.opendaylight.controller.sal.binding.codegen.*,
                         </Private-Package>
                     </instructions>
index 9356ecda88315a0d055d2279070e407321422f38..6ed63b21ddb4a67d3485c008c4417d19131367c9 100644 (file)
@@ -5,23 +5,13 @@ import org.opendaylight.controller.sal.binding.api.data.DataChangeListener
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
 import org.opendaylight.yangtools.yang.binding.DataObject
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction.DataTransactionListener
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
 import org.opendaylight.controller.md.sal.common.api.data.DataReader
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
 import org.opendaylight.yangtools.concepts.ListenerRegistration
-import static extension org.opendaylight.controller.sal.binding.impl.util.MapUtils.*;
-import java.util.Collection
-import java.util.Map.Entry
-import java.util.HashSet
-import java.util.Set
 import com.google.common.collect.Multimap
 import static com.google.common.base.Preconditions.*;
 import java.util.List
-import java.util.LinkedList
-import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider
 import com.google.common.collect.HashMultimap
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Callable
@@ -30,15 +20,17 @@ import org.opendaylight.controller.sal.common.util.Rpcs
 import java.util.Collections
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
 import java.util.ArrayList
-import org.opendaylight.controller.sal.common.util.RpcErrors
+import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
+import java.util.Arrays
 
 class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderService {
 
     @Property
     var ExecutorService executor;
 
-    Multimap<InstanceIdentifier, DataReaderRegistration> configReaders = HashMultimap.create();
-    Multimap<InstanceIdentifier, DataReaderRegistration> operationalReaders = HashMultimap.create();
+    val dataReadRouter = new BindingAwareDataReaderRouter;
+
     Multimap<InstanceIdentifier, DataChangeListenerRegistration> listeners = HashMultimap.create();
     Multimap<InstanceIdentifier, DataCommitHandlerRegistration> commitHandlers = HashMultimap.create();
 
@@ -47,13 +39,11 @@ class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderSer
     }
 
     override readConfigurationData(InstanceIdentifier<? extends DataObject> path) {
-        val readers = configReaders.getAllChildren(path);
-        return readers.readConfiguration(path);
+        return dataReadRouter.readConfigurationData(path);
     }
 
     override readOperationalData(InstanceIdentifier<? extends DataObject> path) {
-        val readers = operationalReaders.getAllChildren(path);
-        return readers.readOperational(path);
+        return dataReadRouter.readOperationalData(path);
     }
 
     override registerCommitHandler(InstanceIdentifier<? extends DataObject> path,
@@ -69,20 +59,12 @@ class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderSer
         return reg;
     }
 
-    override registerDataReader(InstanceIdentifier<? extends DataObject> path,
-        DataReader<InstanceIdentifier<? extends DataObject>, DataObject> provider) {
-        val ret = new DataReaderRegistration(provider, this);
-        ret.paths.add(path);
-        configReaders.put(path, ret);
-        operationalReaders.put(path, ret);
-        return ret;
-    }
-
-    protected def removeReader(DataReaderRegistration reader) {
-        for (path : reader.paths) {
-            operationalReaders.remove(path, reader);
-            configReaders.remove(path, reader);
-        }
+    override registerDataReader(InstanceIdentifier<? extends DataObject> path,DataReader<InstanceIdentifier<? extends DataObject>,DataObject> reader) {
+        
+        val confReg = dataReadRouter.registerConfigurationReader(path,reader);
+        val dataReg = dataReadRouter.registerOperationalReader(path,reader);
+        
+        return new CompositeObjectRegistration(reader,Arrays.asList(confReg,dataReg));
     }
 
     protected def removeListener(DataChangeListenerRegistration registration) {
@@ -92,39 +74,8 @@ class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderSer
     protected def removeCommitHandler(DataCommitHandlerRegistration registration) {
         commitHandlers.remove(registration.path, registration);
     }
-
-    protected def DataObject readConfiguration(
-        Collection<Entry<? extends InstanceIdentifier, ? extends DataReaderRegistration>> entries,
-        InstanceIdentifier<? extends DataObject> path) {
-
-        val List<DataObject> partialResults = new LinkedList();
-        for (entry : entries) {
-            partialResults.add(entry.value.instance.readConfigurationData(path))
-        }
-        return merge(path, partialResults);
-    }
-
-    protected def DataObject readOperational(
-        Collection<Entry<? extends InstanceIdentifier, ? extends DataReaderRegistration>> entries,
-        InstanceIdentifier<? extends DataObject> path) {
-
-        val List<DataObject> partialResults = new LinkedList();
-        for (entry : entries) {
-            partialResults.add(entry.value.instance.readOperationalData(path))
-        }
-        return merge(path, partialResults);
-    }
-
-    protected def DataObject merge(InstanceIdentifier<? extends DataObject> identifier, List<DataObject> objects) {
-
-        // FIXME: implement real merge
-        if (objects.size > 0) {
-            return objects.get(0);
-        }
-    }
     
     protected def getActiveCommitHandlers() {
-        
         return commitHandlers.entries.map[ value.instance].toSet
     }
 
@@ -137,26 +88,6 @@ class DataBrokerImpl extends DeprecatedDataAPISupport implements DataProviderSer
 
 }
 
-package class DataReaderRegistration extends //
-AbstractObjectRegistration<DataReader<InstanceIdentifier<? extends DataObject>, DataObject>> {
-
-    DataBrokerImpl dataBroker;
-
-    @Property
-    val Set<InstanceIdentifier<? extends DataObject>> paths;
-
-    new(DataReader<InstanceIdentifier<? extends DataObject>, DataObject> instance, DataBrokerImpl broker) {
-        super(instance)
-        dataBroker = broker;
-        _paths = new HashSet();
-    }
-
-    override protected removeRegistration() {
-        dataBroker.removeReader(this);
-    }
-
-}
-
 package class DataChangeListenerRegistration extends AbstractObjectRegistration<DataChangeListener> implements ListenerRegistration<DataChangeListener> {
 
     DataBrokerImpl dataBroker;
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataProviderContext.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataProviderContext.xtend
deleted file mode 100644 (file)
index 398a219..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.binding.impl
-
-import org.opendaylight.controller.sal.common.DataStoreIdentifier
-import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider
-
-class DataProviderContext {
-
-    @Property
-    var DataStoreIdentifier identifier;
-    @Property
-    var RuntimeDataProvider provider;
-}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/util/BindingAwareDataReaderRouter.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/util/BindingAwareDataReaderRouter.xtend
new file mode 100644 (file)
index 0000000..f586a8b
--- /dev/null
@@ -0,0 +1,13 @@
+package org.opendaylight.controller.sal.binding.impl.util
+
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
+import org.opendaylight.yangtools.yang.binding.DataObject
+
+class BindingAwareDataReaderRouter extends AbstractDataReadRouter<InstanceIdentifier<? extends DataObject>, DataObject> {
+    
+    override protected merge(InstanceIdentifier<? extends DataObject> path, Iterable<DataObject> data) {
+        return data.iterator.next;
+    }
+    
+}
\ No newline at end of file
index 0fea50b777f14dc31275f9b75c7d1bf4622f6ece..55565252a2b7a408579f46e05b9794cbe707d81a 100644 (file)
@@ -13,19 +13,102 @@ import java.util.Set;
 // FIXME: After 0.6 Release of YANGTools refactor to use Path marker interface for arguments.
 // import org.opendaylight.yangtools.concepts.Path;
 
+public interface DataChange<P/* extends Path<P> */, D> {
 
-public interface DataChange<P/* extends Path<P> */,D> {
+    /**
+     * Returns a map of paths and newly created objects
+     * 
+     * @return map of paths and newly created objects
+     */
+    Map<P, D> getCreatedOperationalData();
 
-    Map<P,D> getCreatedOperationalData();
+    /**
+     * Returns a map of paths and newly created objects
+     * 
+     * @return map of paths and newly created objects
+     */
+    Map<P, D> getCreatedConfigurationData();
 
-    Map<P,D> getUpdatedOperationalData();
+    /**
+     * Returns a map of paths and respective updated objects after update.
+     * 
+     * Original state of the object is in
+     * {@link #getOriginalOperationalData()}
+     * 
+     * @return map of paths and newly created objects
+     */
+    Map<P, D> getUpdatedOperationalData();
 
+    /**
+     * Returns a map of paths and respective updated objects after update.
+     * 
+     * Original state of the object is in
+     * {@link #getOriginalConfigurationData()}
+     * 
+     * @return map of paths and newly created objects
+     */
+    Map<P, D> getUpdatedConfigurationData();
+
+    /**
+     * Returns a set of paths of removed objects.
+     * 
+     * Original state of the object is in
+     * {@link #getOriginalConfigurationData()}
+     * 
+     * @return map of paths and newly created objects
+     */
+    Set<P> getRemovedConfigurationData();
+
+    /**
+     * Returns a set of paths of removed objects.
+     * 
+     * Original state of the object is in
+     * {@link #getOriginalOperationalData()}
+     * 
+     * @return map of paths and newly created objects
+     */
     Set<P> getRemovedOperationalData();
 
-    Map<P,D> getCreatedConfigurationData();
+    /**
+     * Return a map of paths and original state of updated and removed objectd.
+     * 
+     * @return map of paths and original state of updated and removed objectd.
+     */
+    Map<P, D> getOriginalConfigurationData();
 
-    Map<P,D> getUpdatedConfigurationData();
+    /**
+     * Return a map of paths and original state of updated and removed objectd.
+     * 
+     * @return map of paths and original state of updated and removed objectd.
+     */
+    Map<P, D> getOriginalOperationalData();
 
-    Set<P> getRemovedConfigurationData();
+    /**
+     * Returns a original subtree of data, which starts at the path
+     * where listener was registered.
+     * 
+     */
+    D getOriginalConfigurationSubtree();
+
+    /**
+     * Returns a original subtree of data, which starts at the path
+     * where listener was registered.
+     * 
+     */
+    D getOriginalOperationalSubtree();
+
+    /**
+     * Returns a new subtree of data, which starts at the path
+     * where listener was registered.
+     * 
+     */
+    D getUpdatedConfigurationSubtree();
+
+    /**
+     * Returns a new subtree of data, which starts at the path
+     * where listener was registered.
+     * 
+     */
+    D getUpdatedOperationalSubtree();
 
 }
index 85e3d8f57ca4a6a2de3913b4363ba5e1fb0af651..90de13d15e8229238f36c5ec59b1180babcff1ae 100644 (file)
@@ -84,7 +84,7 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
  */
 public interface DataCommitHandler<P/* extends Path<P> */,D> {
 
-
+    
     DataCommitTransaction<P, D> requestCommit(DataModification<P,D> modification);
 
     public interface DataCommitTransaction<P/* extends Path<P> */,D> {
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStore.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/DataStore.java
new file mode 100644 (file)
index 0000000..f448d4e
--- /dev/null
@@ -0,0 +1,16 @@
+package org.opendaylight.controller.md.sal.common.api.data;
+
+public interface DataStore<P, D> extends //
+        DataReader<P, D>, //
+        DataModificationTransactionFactory<P, D> {
+
+    @Override
+    public DataModification<P, D> beginTransaction();
+
+    @Override
+    public D readConfigurationData(P path);
+
+    @Override
+    public D readOperationalData(P path);
+
+}
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Route.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Route.java
new file mode 100644 (file)
index 0000000..afe9e99
--- /dev/null
@@ -0,0 +1,10 @@
+package org.opendaylight.controller.md.sal.common.api.routing;
+
+import org.opendaylight.yangtools.concepts.Immutable;
+
+public interface Route<C,P> extends Immutable {
+
+    C getType();
+    
+    P getPath();
+}
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangeListener.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangeListener.java
new file mode 100644 (file)
index 0000000..994f65b
--- /dev/null
@@ -0,0 +1,8 @@
+package org.opendaylight.controller.md.sal.common.api.routing;
+
+import java.util.EventListener;
+
+public interface RouteChangeListener<C,P> extends EventListener {
+
+    void onRouteChange(RouteChange<C, P> change);
+}
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangePublisher.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangePublisher.java
new file mode 100644 (file)
index 0000000..89851c9
--- /dev/null
@@ -0,0 +1,8 @@
+package org.opendaylight.controller.md.sal.common.api.routing;
+
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+public interface RouteChangePublisher<C,P> {
+
+    ListenerRegistration<RouteChangeListener<C,P>> registerRouteChangeListener(RouteChangeListener<C,P> listener);
+}
diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Router.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/Router.java
new file mode 100644 (file)
index 0000000..8d0a90c
--- /dev/null
@@ -0,0 +1,10 @@
+package org.opendaylight.controller.md.sal.common.api.routing;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface Router<C,P,D> extends //
+        RouteChangePublisher<C, P> {
+
+    Map<C, Set<P>> getAnnouncedPaths();
+}
index 0c2344a23e916e45ab43ace0a9e0cd3866cb266a..3bd51ec7d8b68dabe567857101a098cf2d2f4928 100644 (file)
         <artifactId>maven-bundle-plugin</artifactId>
         <configuration>
           <instructions>
-            <Export-Package>org.opendaylight.controller.md.sal.common.impl</Export-Package>
+            <Export-Package>
+                org.opendaylight.controller.md.sal.common.impl,
+                org.opendaylight.controller.md.sal.common.impl.*
+            </Export-Package>
           </instructions>
         </configuration>
       </plugin>
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/AbstractDataReadRouter.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/AbstractDataReadRouter.java
new file mode 100644 (file)
index 0000000..f83c61f
--- /dev/null
@@ -0,0 +1,187 @@
+package org.opendaylight.controller.md.sal.common.impl.routing;
+
+import java.util.Map.Entry;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.concepts.Registration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Base abstract implementation of DataReadRouter, which performs
+ * a read operation on multiple data readers and then merges result.
+ * 
+ * @param <P>
+ * @param <D>
+ */
+public abstract class AbstractDataReadRouter<P extends Path<?>, D> implements DataReader<P, D> {
+
+    Multimap<P, DataReaderRegistration<P, D>> configReaders = HashMultimap.create();
+    Multimap<P, DataReaderRegistration<P, D>> operationalReaders = HashMultimap.create();
+
+    @Override
+    public D readConfigurationData(P path) {
+        FluentIterable<D> dataBits = FluentIterable //
+                .from(getReaders(configReaders, path)).transform(configurationRead(path));
+        return merge(path,dataBits);
+    }
+
+    @Override
+    public D readOperationalData(P path) {
+        FluentIterable<D> dataBits = FluentIterable //
+                .from(getReaders(configReaders, path)).transform(operationalRead(path));
+        return merge(path,dataBits);
+
+    }
+
+    /**
+     * Merges data readed by reader instances from specified path
+     * 
+     * @param path Path on which read was performed
+     * @param data Data which was returned by read operation.
+     * @return Merged result.
+     */
+    protected abstract D merge(P path,Iterable<D> data);
+
+    /**
+     * Returns a function which performs configuration read for supplied path
+     * 
+     * @param path
+     * @return function which performs configuration read for supplied path
+     */
+    
+    private Function<DataReader<P, D>, D> configurationRead(final P path) {
+        return new Function<DataReader<P, D>, D>() {
+            @Override
+            public D apply(DataReader<P, D> input) {
+                return input.readConfigurationData(path);
+            }
+        };
+    }
+
+    /**
+     * Returns a function which performs operational read for supplied path
+     * 
+     * @param path
+     * @return function which performs operational read for supplied path
+     */
+    private Function<DataReader<P, D>, D> operationalRead(final P path) {
+        return new Function<DataReader<P, D>, D>() {
+            @Override
+            public D apply(DataReader<P, D> input) {
+                return input.readConfigurationData(path);
+            }
+        };
+    }
+
+    // Registrations
+
+    /**
+     * Register's a reader for operational data.
+     * 
+     * @param path Path which is served by this reader
+     * @param reader Reader instance which is responsible for reading particular subpath.
+     * @return 
+     */
+    public Registration<DataReader<P, D>> registerOperationalReader(P path, DataReader<P, D> reader) {
+        OperationalDataReaderRegistration<P, D> ret = new OperationalDataReaderRegistration<>(path, reader);
+        operationalReaders.put(path, ret);
+        return ret;
+    }
+
+    public Registration<DataReader<P, D>> registerConfigurationReader(P path, DataReader<P, D> reader) {
+        ConfigurationDataReaderRegistration<P, D> ret = new ConfigurationDataReaderRegistration<>(path, reader);
+        configReaders.put(path, ret);
+        return ret;
+    }
+
+    Iterable<DataReader<P, D>> getOperationalReaders(P path) {
+        return getReaders(operationalReaders, path);
+    }
+
+    Iterable<DataReader<P, D>> getConfigurationReaders(P path) {
+        return getReaders(configReaders, path);
+    }
+
+    private Iterable<DataReader<P, D>> getReaders(Multimap<P, DataReaderRegistration<P, D>> readerMap, P path) {
+        return FluentIterable
+            .from(readerMap.entries()) //
+            .filter(affects(path)) //
+            .transform(retrieveInstance());
+    }
+
+    private void removeRegistration(OperationalDataReaderRegistration<?, ?> registration) {
+        operationalReaders.remove(registration.getKey(), registration);
+    }
+
+    private void removeRegistration(ConfigurationDataReaderRegistration<?, ?> registration) {
+        configReaders.remove(registration.getKey(), registration);
+    }
+
+    private Function<? super Entry<P, DataReaderRegistration<P, D>>, DataReader<P, D>> retrieveInstance() {
+        return new Function<Entry<P, DataReaderRegistration<P, D>>, DataReader<P,D>>() {
+            @Override
+            public DataReader<P, D> apply(Entry<P, DataReaderRegistration<P, D>> input) {
+                return input.getValue().getInstance();
+            }
+        };
+    }
+
+    private Predicate<? super Entry<P, DataReaderRegistration<P, D>>> affects(final P path) {
+        
+        return new Predicate<Entry<P, DataReaderRegistration<P, D>>>() {
+            
+            @Override
+            public boolean apply(Entry<P, DataReaderRegistration<P, D>> input) {
+                final Path key = input.getKey();
+                return key.contains(path) || ((Path) path).contains(key);
+            }
+            
+        };
+    }
+
+    private class ConfigurationDataReaderRegistration<P extends Path<?>, D> extends DataReaderRegistration<P, D> {
+
+        public ConfigurationDataReaderRegistration(P key, DataReader<P, D> instance) {
+            super(key, instance);
+        }
+
+        @Override
+        protected void removeRegistration() {
+            AbstractDataReadRouter.this.removeRegistration(this);
+        }
+    }
+
+    private class OperationalDataReaderRegistration<P extends Path<?>, D> extends DataReaderRegistration<P, D> {
+
+        public OperationalDataReaderRegistration(P key, DataReader<P, D> instance) {
+            super(key, instance);
+        }
+
+        @Override
+        protected void removeRegistration() {
+            AbstractDataReadRouter.this.removeRegistration(this);
+        }
+    }
+
+    private abstract static class DataReaderRegistration<P extends Path<?>, D> extends
+            AbstractObjectRegistration<DataReader<P, D>> {
+
+        private final P key;
+
+        public P getKey() {
+            return this.key;
+        }
+
+        public DataReaderRegistration(P key, DataReader<P, D> instance) {
+            super(instance);
+            this.key = key;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcConsumptionRegistry.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcConsumptionRegistry.java
new file mode 100644 (file)
index 0000000..c19ee1a
--- /dev/null
@@ -0,0 +1,22 @@
+package org.opendaylight.controller.sal.core.api;
+
+import java.util.concurrent.Future;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+public interface RpcConsumptionRegistry {
+    /**
+     * Sends an RPC to other components registered to the broker.
+     * 
+     * @see RpcImplementation
+     * @param rpc
+     *            Name of RPC
+     * @param input
+     *            Input data to the RPC
+     * @return Result of the RPC call
+     */
+    Future<RpcResult<CompositeNode>> rpc(QName rpc, CompositeNode input);
+
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcProvisionRegistry.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcProvisionRegistry.java
new file mode 100644 (file)
index 0000000..c326bab
--- /dev/null
@@ -0,0 +1,33 @@
+package org.opendaylight.controller.sal.core.api;
+
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+
+public interface RpcProvisionRegistry {
+
+    /**
+     * Registers an implementation of the rpc.
+     * 
+     * <p>
+     * The registered rpc functionality will be available to all other
+     * consumers and providers registered to the broker, which are aware of
+     * the {@link QName} assigned to the rpc.
+     * 
+     * <p>
+     * There is no assumption that rpc type is in the set returned by
+     * invoking {@link RpcImplementation#getSupportedRpcs()}. This allows
+     * for dynamic rpc implementations.
+     * 
+     * @param rpcType
+     *            Name of Rpc
+     * @param implementation
+     *            Provider's Implementation of the RPC functionality
+     * @throws IllegalArgumentException
+     *             If the name of RPC is invalid
+     */
+    RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
+            throws IllegalArgumentException;
+
+    RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+}
index 3024c89d615b4723041fe0707f5459a08f698b1b..20fa29dceb7cd807ca78c2650d357f5a30bb0cb1 100644 (file)
@@ -10,8 +10,10 @@ package org.opendaylight.controller.sal.core.api.data;
 import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
 import org.opendaylight.controller.sal.common.DataStoreIdentifier;
 import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;;
 
 public interface DataProviderService extends 
     DataBrokerService, //
@@ -54,6 +56,11 @@ public interface DataProviderService extends
      */
     void removeRefresher(DataStoreIdentifier store, DataRefresher refresher);
 
+    
+    Registration<DataReader<InstanceIdentifier, CompositeNode>> registerConfigurationReader(InstanceIdentifier path, DataReader<InstanceIdentifier, CompositeNode> reader);
+
+    Registration<DataReader<InstanceIdentifier, CompositeNode>> registerOperationalReader(InstanceIdentifier path, DataReader<InstanceIdentifier, CompositeNode> reader);
+    
     public interface DataRefresher extends Provider.ProviderFunctionality {
 
         /**
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionInstance.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionInstance.java
new file mode 100644 (file)
index 0000000..8f6a5d0
--- /dev/null
@@ -0,0 +1,8 @@
+package org.opendaylight.controller.sal.core.api.mount;
+
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
+
+public interface MountProvisionInstance extends MountInstance, NotificationPublishService, RpcProvisionRegistry {
+
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionService.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/mount/MountProvisionService.java
new file mode 100644 (file)
index 0000000..fade7d3
--- /dev/null
@@ -0,0 +1,13 @@
+package org.opendaylight.controller.sal.core.api.mount;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public interface MountProvisionService extends MountService {
+
+    @Override
+    public MountProvisionInstance getMountPoint(InstanceIdentifier path);
+    
+    MountProvisionInstance createMountPoint(InstanceIdentifier path);
+    
+    MountProvisionInstance createOrGetMountPoint(InstanceIdentifier path);
+}
index 678728ab3b86b8d0883dce0f613f2cd42b6e5129..9383a9e2ff50ee2149d1ce034ed6f195dfb0e7e6 100644 (file)
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-common-util</artifactId>\r
             <version>1.0-SNAPSHOT</version>\r
+        </dependency>\r
+                <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>sal-common-impl</artifactId>\r
+            <version>1.0-SNAPSHOT</version>\r
         </dependency>\r
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
@@ -60,7 +65,7 @@
                         <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>\r
                         <Bundle-Activator>org.opendaylight.controller.sal.dom.broker.BrokerActivator</Bundle-Activator>\r
                         <Private-Package>\r
-                            org.opendaylight.controller.sal.dom.broker,\r
+                            org.opendaylight.controller.sal.dom.broker.*\r
                         </Private-Package>\r
                     </instructions>\r
                 </configuration>\r
index 83dda5902dee4e6fd819debabcaecd12ca0413e8..855ad9bd328d19c79b30e09ba4e485e0d1918044 100644 (file)
@@ -7,29 +7,28 @@
  */
 package org.opendaylight.controller.sal.dom.broker;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.BrokerService;
-import org.opendaylight.controller.sal.core.api.Consumer;
-import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.spi.BrokerModule;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.osgi.framework.BundleContext;
-import org.slf4j.LoggerFactory;
+import java.util.Collections
+import java.util.HashMap
+import java.util.HashSet
+import java.util.Map
+import java.util.Set
+import java.util.concurrent.Callable
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.Future
+import org.opendaylight.controller.sal.core.api.Broker
+import org.opendaylight.controller.sal.core.api.BrokerService
+import org.opendaylight.controller.sal.core.api.Consumer
+import org.opendaylight.controller.sal.core.api.Provider
+import org.opendaylight.controller.sal.core.spi.BrokerModule
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.yangtools.yang.common.RpcResult
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.osgi.framework.BundleContext
+import org.slf4j.LoggerFactory
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
 import org.opendaylight.yangtools.concepts.ListenerRegistration
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
-import org.opendaylight.controller.md.sal.common.impl.ListenerRegistry
 
 public class BrokerImpl implements Broker {
     private static val log = LoggerFactory.getLogger(BrokerImpl);
@@ -42,17 +41,14 @@ public class BrokerImpl implements Broker {
     private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
         synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
 
-
-    private val rpcRegistrationListeners = new ListenerRegistry<RpcRegistrationListener>();
-    // RPC Context
-    private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
-        new HashMap<QName, RpcImplementation>());
-
     // Implementation specific
     @Property
     private var ExecutorService executor = Executors.newFixedThreadPool(5);
     @Property
     private var BundleContext bundleContext;
+    
+    @Property
+    private var RpcRouter router;
 
     override registerConsumer(Consumer consumer, BundleContext ctx) {
         checkPredicates(consumer);
@@ -95,42 +91,8 @@ public class BrokerImpl implements Broker {
         return prov.getServiceForSession(service, session);
     }
 
-    // RPC Functionality
-    protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) {
-        if(rpcImpls.get(rpcType) != null) {
-            throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
-        }
-
-        
-        rpcImpls.put(rpcType, implementation);
-
-        
-        for(listener : rpcRegistrationListeners.listeners)  {
-            try {
-                listener.instance.onRpcImplementationAdded(rpcType);
-            } catch (Exception e){
-                log.error("Unhandled exception during invoking listener",e);
-            }
-        }
-    }
-
-    protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) {
-        if(implToRemove == rpcImpls.get(rpcType)) {
-            rpcImpls.remove(rpcType);
-        }
-        
-        for(listener : rpcRegistrationListeners.listeners)  {
-            try {
-                listener.instance.onRpcImplementationRemoved(rpcType);
-            } catch (Exception e){
-                log.error("Unhandled exception during invoking listener",e);
-            }
-        }
-    }
-
     protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
-        val impl = rpcImpls.get(rpc);
-        val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
+        val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
         return result;
     }
 
@@ -171,12 +133,4 @@ public class BrokerImpl implements Broker {
         sessions.remove(consumerContextImpl);
         providerSessions.remove(consumerContextImpl);
     }
-    
-    protected def getSupportedRpcs() {
-        rpcImpls.keySet;
-    }
-    
-    def ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
-        rpcRegistrationListeners.register(listener);
-    }
 }
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java
new file mode 100644 (file)
index 0000000..7037b46
--- /dev/null
@@ -0,0 +1,118 @@
+package org.opendaylight.controller.sal.dom.broker;
+
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.sal.dom.broker.impl.DataReaderRouter;
+import org.opendaylight.controller.sal.dom.broker.impl.NotificationRouterImpl;
+import org.opendaylight.controller.sal.dom.broker.impl.RpcRouterImpl;
+import org.opendaylight.controller.sal.dom.broker.spi.NotificationRouter;
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public class MountPointImpl implements MountProvisionInstance {
+
+    final RpcRouter rpcs;
+    final DataReaderRouter dataReader;
+    final NotificationRouter notificationRouter;
+
+    public MountPointImpl(InstanceIdentifier path) {
+        rpcs = new RpcRouterImpl("");
+        dataReader = new DataReaderRouter();
+        notificationRouter = new NotificationRouterImpl();
+    }
+
+    @Override
+    public void publish(CompositeNode notification) {
+        notificationRouter.publish(notification);
+    }
+
+    @Override
+    public Registration<NotificationListener> addNotificationListener(QName notification, NotificationListener listener) {
+        return notificationRouter.addNotificationListener(notification, listener);
+    }
+
+    @Override
+    public CompositeNode readConfigurationData(InstanceIdentifier path) {
+        return dataReader.readConfigurationData(path);
+    }
+
+    @Override
+    public CompositeNode readOperationalData(InstanceIdentifier path) {
+        return dataReader.readOperationalData(path);
+    }
+
+    public Registration<DataReader<InstanceIdentifier, CompositeNode>> registerOperationalReader(
+            InstanceIdentifier path, DataReader<InstanceIdentifier, CompositeNode> reader) {
+        return dataReader.registerOperationalReader(path, reader);
+    }
+
+    public Registration<DataReader<InstanceIdentifier, CompositeNode>> registerConfigurationReader(
+            InstanceIdentifier path, DataReader<InstanceIdentifier, CompositeNode> reader) {
+        return dataReader.registerConfigurationReader(path, reader);
+    }
+
+    @Override
+    public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
+        return rpcs.addRoutedRpcImplementation(rpcType, implementation);
+    }
+
+    @Override
+    public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
+            throws IllegalArgumentException {
+        return rpcs.addRpcImplementation(rpcType, implementation);
+    }
+
+    public Set<QName> getSupportedRpcs() {
+        return rpcs.getSupportedRpcs();
+    }
+
+    
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+        return rpcs.invokeRpc(rpc, input);
+    }
+
+    public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
+        return rpcs.addRpcRegistrationListener(listener);
+    }
+
+
+    @Override
+    public Future<RpcResult<CompositeNode>> rpc(QName type, CompositeNode input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public DataModificationTransaction beginTransaction() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ListenerRegistration<DataChangeListener> registerDataChangeListener(InstanceIdentifier path,
+            DataChangeListener listener) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void sendNotification(CompositeNode notification) {
+        publish(notification);
+        
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointManagerImpl.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointManagerImpl.xtend
new file mode 100644 (file)
index 0000000..c64d1e5
--- /dev/null
@@ -0,0 +1,35 @@
+package org.opendaylight.controller.sal.dom.broker
+
+
+import org.opendaylight.controller.sal.core.api.mount.MountProvisionService
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.ConcurrentHashMap
+import static com.google.common.base.Preconditions.*;
+
+class MountPointManagerImpl implements MountProvisionService {
+    
+    ConcurrentMap<InstanceIdentifier,MountPointImpl> mounts = new ConcurrentHashMap();
+    
+    override createMountPoint(InstanceIdentifier path) {
+        checkState(!mounts.containsKey(path),"Mount already created");
+        val mount = new MountPointImpl(path);
+        mounts.put(path,mount);
+    }
+    
+    
+    override createOrGetMountPoint(InstanceIdentifier path) {
+        val mount = mounts.get(path);
+        if(mount === null) {
+            return createMountPoint(path)
+        }
+        return mount;
+    }
+    
+    
+    override getMountPoint(InstanceIdentifier path) {
+        mounts.get(path);
+    }
+    
+    
+}
index bffc5705962b3fd7f2ad790a11214dee8166cbc7..cf5d220943142ad83c948299aeef3222d882b136 100644 (file)
@@ -1,25 +1,23 @@
 package org.opendaylight.controller.sal.dom.broker
 
-import java.util.Collections
-import java.util.HashMap
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession
 import org.opendaylight.controller.sal.core.api.Provider
 import org.opendaylight.controller.sal.core.api.RpcImplementation
 import org.opendaylight.yangtools.yang.common.QName
 import org.osgi.framework.BundleContext
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
-import static java.util.Collections.*
-import java.util.Collections
-import java.util.HashMap
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.opendaylight.yangtools.concepts.Registration
+
+import java.util.Set
+import java.util.HashSet
 
 class ProviderContextImpl extends ConsumerContextImpl implements ProviderSession {
 
     @Property
     private val Provider provider;
 
-    private val rpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
+    private val Set<Registration<?>> registrations = new HashSet();
 
     new(Provider provider, BundleContext ctx) {
         super(null, ctx);
@@ -27,39 +25,22 @@ class ProviderContextImpl extends ConsumerContextImpl implements ProviderSession
     }
 
     override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
-        if (rpcType == null) {
-            throw new IllegalArgumentException("rpcType must not be null");
-        }
-        if (implementation == null) {
-            throw new IllegalArgumentException("Implementation must not be null");
-        }
-        broker.addRpcImplementation(rpcType, implementation);
-        rpcImpls.put(rpcType, implementation);
-
-        return new RpcRegistrationImpl(rpcType, implementation, this);
+        val origReg = broker.router.addRpcImplementation(rpcType, implementation);
+        val newReg = new RpcRegistrationWrapper(origReg);
+        registrations.add(newReg);
+        return newReg;
     }
 
-    def removeRpcImplementation(RpcRegistrationImpl implToRemove) throws IllegalArgumentException {
-        val localImpl = rpcImpls.get(implToRemove.type);
-        if (localImpl !== implToRemove.instance) {
-            throw new IllegalStateException("Implementation was not registered in this session");
-        }
-        broker.removeRpcImplementation(implToRemove.type, localImpl);
-        rpcImpls.remove(implToRemove.type);
+    protected def removeRpcImplementation(RpcRegistrationWrapper implToRemove) throws IllegalArgumentException {
+        registrations.remove(implToRemove);
     }
-
+    
     override close() {
-        removeAllRpcImlementations
-        super.close
-    }
-
-    private def removeAllRpcImlementations() {
-        if (!rpcImpls.empty) {
-            for (entry : rpcImpls.entrySet) {
-                broker.removeRpcImplementation(entry.key, entry.value);
-            }
-            rpcImpls.clear
+        
+        for (reg : registrations) {
+            reg.close()
         }
+        super.close
     }
 
     override addMountedRpcImplementation(QName rpcType, RpcImplementation implementation) {
@@ -71,30 +52,34 @@ class ProviderContextImpl extends ConsumerContextImpl implements ProviderSession
     }
 
     override getSupportedRpcs() {
-        broker.getSupportedRpcs();
+        broker.router.supportedRpcs;
     }
 
     override addRpcRegistrationListener(RpcRegistrationListener listener) {
-        broker.addRpcRegistrationListener(listener);
+        broker.router.addRpcRegistrationListener(listener);
     }
 }
 
-class RpcRegistrationImpl extends AbstractObjectRegistration<RpcImplementation> implements RpcRegistration {
+class RpcRegistrationWrapper implements RpcRegistration {
+
 
     @Property
-    val QName type
+    val RpcRegistration delegate
 
-    private var ProviderContextImpl context
+    new(RpcRegistration delegate) {
+        _delegate = delegate
+    }
 
-    new(QName type, RpcImplementation instance, ProviderContextImpl ctx) {
-        super(instance)
-        _type = type
-        context = ctx
+    override getInstance() {
+        delegate.instance
     }
 
-    override protected removeRegistration() {
-        context.removeRpcImplementation(this)
-        context = null
+    override close() {
+        delegate.close
     }
 
+    override getType() {
+        delegate.type
+    }
 }
+
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/DataReaderRouter.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/DataReaderRouter.xtend
new file mode 100644 (file)
index 0000000..1e0f338
--- /dev/null
@@ -0,0 +1,13 @@
+package org.opendaylight.controller.sal.dom.broker.impl
+
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+
+class DataReaderRouter extends AbstractDataReadRouter<InstanceIdentifier, CompositeNode> {
+
+    override protected merge(InstanceIdentifier path, Iterable<CompositeNode> data) {
+        return data.iterator.next
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/NotificationRouterImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/NotificationRouterImpl.java
new file mode 100644 (file)
index 0000000..6d7b600
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.dom.broker.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.opendaylight.controller.sal.core.api.BrokerService;
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;
+import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService;
+import org.opendaylight.controller.sal.core.api.notify.NotificationService;
+import org.opendaylight.controller.sal.core.spi.BrokerModule;
+import org.opendaylight.controller.sal.dom.broker.spi.NotificationRouter;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+public class NotificationRouterImpl implements NotificationRouter {
+    private static Logger log = LoggerFactory.getLogger(NotificationRouterImpl.class);
+
+    private Multimap<QName, Registration<NotificationListener>> listeners = HashMultimap.create();
+
+    private void sendNotification(CompositeNode notification) {
+        QName type = notification.getNodeType();
+        Collection<Registration<NotificationListener>> toNotify = listeners.get(type);
+        log.info("Publishing notification " + type);
+
+        if (toNotify == null) {
+            // No listeners were registered - returns.
+            return;
+        }
+
+        for (Registration<NotificationListener> listener : toNotify) {
+            try {
+                // FIXME: ensure that notification is immutable
+                listener.getInstance().onNotification(notification);
+            } catch (Exception e) {
+                log.error("Uncaught exception in NotificationListener", e);
+            }
+        }
+
+    }
+
+    @Override
+    public void publish(CompositeNode notification) {
+        sendNotification(notification);
+    }
+
+    @Override
+    public Registration<NotificationListener> addNotificationListener(QName notification, NotificationListener listener) {
+        ListenerRegistration ret = new ListenerRegistration(notification, listener);
+        return ret;
+    }
+
+    private class ListenerRegistration extends AbstractObjectRegistration<NotificationListener> {
+
+        final QName type;
+
+        public ListenerRegistration(QName type, NotificationListener instance) {
+            super(instance);
+            this.type = type;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            listeners.remove(type, this);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/RpcRouterImpl.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/RpcRouterImpl.xtend
new file mode 100644 (file)
index 0000000..d67697f
--- /dev/null
@@ -0,0 +1,106 @@
+package org.opendaylight.controller.sal.dom.broker.impl
+
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
+import org.opendaylight.yangtools.concepts.Identifiable
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.controller.sal.core.api.RpcImplementation
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import static com.google.common.base.Preconditions.*;
+import java.util.Map
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration
+import java.util.concurrent.ConcurrentHashMap
+import java.util.Set
+import java.util.Collections
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
+import org.opendaylight.controller.md.sal.common.impl.ListenerRegistry
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.slf4j.LoggerFactory
+
+class RpcRouterImpl implements RpcRouter, Identifiable<String> {
+
+    static val log = LoggerFactory.getLogger(RpcRouterImpl)
+
+    Map<QName, RpcRegistration> implementations = new ConcurrentHashMap();
+
+    @Property
+    val Set<QName> supportedRpcs = Collections.unmodifiableSet(implementations.keySet);
+
+    private val rpcRegistrationListeners = new ListenerRegistry<RpcRegistrationListener>();
+
+    @Property
+    val String identifier;
+
+    new(String name) {
+        _identifier = name;
+    }
+
+    override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
+    }
+
+    override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
+        checkNotNull(rpcType, "Rpc Type should not be null");
+        checkNotNull(implementation, "Implementation should not be null.");
+        checkState(!implementations.containsKey(rpcType), "Provider for supplied rpc is already registered.");
+        val reg = new RpcRegistrationImpl(rpcType, implementation, this);
+        implementations.put(rpcType, reg)
+
+        for (listener : rpcRegistrationListeners.listeners) {
+            try {
+                listener.instance.onRpcImplementationAdded(rpcType);
+            } catch (Exception e) {
+                log.error("Unhandled exception during invoking listener", e);
+            }
+        }
+
+        return reg;
+
+    }
+
+    override invokeRpc(QName rpc, CompositeNode input) {
+        checkNotNull(rpc, "Rpc Type should not be null");
+
+        val impl = implementations.get(rpc);
+        checkState(impl !== null, "Provider for supplied rpc is not registered.");
+
+        return impl.instance.invokeRpc(rpc, input);
+    }
+
+    def remove(RpcRegistrationImpl impl) {
+        val existing = implementations.get(impl.type);
+        if (existing == impl) {
+            implementations.remove(impl.type);
+        }
+        for (listener : rpcRegistrationListeners.listeners) {
+            try {
+                listener.instance.onRpcImplementationRemoved(impl.type);
+            } catch (Exception e) {
+                log.error("Unhandled exception during invoking listener", e);
+            }
+        }
+    }
+    
+    override addRpcRegistrationListener(RpcRegistrationListener listener) {
+        rpcRegistrationListeners.register(listener);
+    }
+
+}
+
+class RpcRegistrationImpl extends AbstractObjectRegistration<RpcImplementation> implements RpcRegistration {
+
+    @Property
+    val QName type;
+
+    @Property
+    var RpcRouterImpl router;
+
+    new(QName type, RpcImplementation instance, RpcRouterImpl router) {
+        super(instance)
+        _type = type
+        _router = router
+    }
+
+    override protected removeRegistration() {
+        router.remove(this);
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/NotificationRouter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/NotificationRouter.java
new file mode 100644 (file)
index 0000000..fc42d2c
--- /dev/null
@@ -0,0 +1,21 @@
+package org.opendaylight.controller.sal.dom.broker.spi;
+
+import org.opendaylight.controller.sal.core.api.notify.NotificationListener;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+public interface NotificationRouter {
+
+    void publish(CompositeNode notification);
+
+    /**
+     * Registers a notification listener for supplied notification type.
+     * 
+     * @param notification
+     * @param listener
+     */
+    Registration<NotificationListener> addNotificationListener(QName notification,
+            NotificationListener listener);
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RoutedRpcProcessor.java
new file mode 100644 (file)
index 0000000..97c2a15
--- /dev/null
@@ -0,0 +1,27 @@
+package org.opendaylight.controller.sal.dom.broker.spi;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public interface RoutedRpcProcessor extends RpcImplementation {
+
+    public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+
+    public Set<QName> getSupportedRpcs();
+
+    public QName getRpcType();
+    
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
+
+    Map<InstanceIdentifier,RpcImplementation> getRoutes();
+    
+    RpcImplementation getDefaultRoute();
+
+}
diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/spi/RpcRouter.java
new file mode 100644 (file)
index 0000000..6886f89
--- /dev/null
@@ -0,0 +1,31 @@
+package org.opendaylight.controller.sal.dom.broker.spi;
+
+import java.util.Set;
+
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+public interface RpcRouter extends RpcProvisionRegistry, RpcImplementation {
+
+    @Override
+    public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+    
+    @Override
+    public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
+            throws IllegalArgumentException;
+    
+    @Override
+    public Set<QName> getSupportedRpcs();
+    
+    @Override
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input);
+
+    ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener);
+}