Fixed bug in cross-broker RPC routing 00/3700/2
authorTony Tkacik <ttkacik@cisco.com>
Fri, 13 Dec 2013 13:11:03 +0000 (14:11 +0100)
committerTony Tkacik <ttkacik@cisco.com>
Fri, 13 Dec 2013 19:07:53 +0000 (20:07 +0100)
  - Fixed cross-broker (Binding to DOM) and (DOM to Binding)
    rpc routing by adding explicit routing and invocation strategies

  - Lowered reporting level for fingAugmentableArgument to DEBUG
    so it does not polute logs

  - Extracted and converted RpcProviderRegistry implementation
    from BindingAwareBrokerImpl (Xtend) to RpcProviderRegistryImpl
    (java).

Change-Id: I8339abdc864162c70a149cf59fdbdf97093ae8c0
Signed-off-by: Tony Tkacik <ttkacik@cisco.com>
34 files changed:
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/RpcAvailabilityListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/RuntimeMappingModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/RuntimeCodeHelper.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRouterCodegenInstance.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRouterCodegenInstance.xtend [deleted file]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRoutingTableImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRoutingTableImpl.xtend [deleted file]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RuntimeCodeGenerator.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/dom/serializer/impl/LazyGeneratedCodecRegistry.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/dom/serializer/impl/RuntimeGeneratedMappingServiceImpl.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/OsgiConsumerContext.xtend
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/RpcProviderRegistryImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java [moved from opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentDataServiceConnector.java with 58% similarity]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentMappingService.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/RoutingContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/RpcContextIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/RpcRouter.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/AbstractDataServiceTest.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/RuntimeCodeGeneratorTest.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerRpcTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/MessageCapturingFlowService.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangePublisher.java
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/RoutingUtils.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-util/pom.xml
opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/sal/common/util/Rpcs.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/RpcRouterImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareDataStoreAdapter.java

index 6f1b2c0..4f2b255 100644 (file)
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
             </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/config</source>
+                                <source>${project.build.directory}/generated-sources/sal</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/RpcAvailabilityListener.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/RpcAvailabilityListener.java
new file mode 100644 (file)
index 0000000..e25b939
--- /dev/null
@@ -0,0 +1,5 @@
+package org.opendaylight.controller.sal.binding.api;
+
+public interface RpcAvailabilityListener {
+
+}
index 74b6ad8..01dc6b8 100644 (file)
@@ -16,7 +16,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import org.opendaylight.controller.config.yang.md.sal.binding.statistics.DataBrokerRuntimeMXBeanImpl;\r
 import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;\r
 import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;\r
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentDataServiceConnector;\r
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector;\r
 import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentMappingService;\r
 import org.opendaylight.controller.sal.core.api.Broker;\r
 import org.opendaylight.controller.sal.core.api.data.DataProviderService;\r
@@ -63,7 +63,7 @@ public final class DataBrokerImplModule extends
         BindingIndependentMappingService mappingService = getMappingServiceDependency();\r
         \r
         if (domBroker != null && mappingService != null) {\r
-            BindingIndependentDataServiceConnector runtimeMapping = new BindingIndependentDataServiceConnector();\r
+            BindingIndependentConnector runtimeMapping = new BindingIndependentConnector();\r
             runtimeMapping.setMappingService(mappingService);\r
             runtimeMapping.setBaDataService(dataBindingBroker);\r
             domBroker.registerProvider(runtimeMapping, getBundleContext());\r
index 1bf15c1..99b7ed8 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.config.yang.md.sal.binding.impl;
 
 import javassist.ClassPool;
 
+import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
 import org.opendaylight.controller.sal.binding.dom.serializer.impl.RuntimeGeneratedMappingServiceImpl;
 import org.osgi.framework.BundleContext;
 
@@ -50,8 +51,7 @@ public final class RuntimeMappingModule extends
     @Override
     public java.lang.AutoCloseable createInstance() {
         RuntimeGeneratedMappingServiceImpl service = new RuntimeGeneratedMappingServiceImpl();
-        ClassPool pool = new ClassPool(); // Should be default singleton
-        service.setPool(pool);
+        service.setPool(SingletonHolder.CLASS_POOL);
         service.start(getBundleContext());
         return service;
     }
index f0f92da..dff0d21 100644 (file)
@@ -76,7 +76,6 @@ class RuntimeCodeHelper {
         if (field == null) throw new UnsupportedOperationException(
             "Unable to set routing table. Table field does not exists");
         field.set(target,routingTable);
-        
     }
 
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRouterCodegenInstance.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRouterCodegenInstance.java
new file mode 100644 (file)
index 0000000..780d0bd
--- /dev/null
@@ -0,0 +1,161 @@
+package org.opendaylight.controller.sal.binding.codegen.impl;
+
+import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
+import org.opendaylight.controller.sal.binding.spi.RpcRouter;
+import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import static org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.HashMap;
+
+import org.opendaylight.controller.sal.binding.spi.RpcRoutingTable;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.RpcImplementation;
+import org.opendaylight.controller.md.sal.common.api.routing.MutableRoutingTable;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class RpcRouterCodegenInstance<T extends RpcService> implements //
+        RpcRouter<T>, RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RpcRouterCodegenInstance.class);
+
+    private T defaultService;
+
+    private final Class<T> serviceType;
+
+    private final T invocationProxy;
+
+    private final Set<Class<? extends BaseIdentity>> contexts;
+
+    private final ListenerRegistry<RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>>> listeners;
+
+    private final Map<Class<? extends BaseIdentity>, RpcRoutingTableImpl<? extends BaseIdentity, T>> routingTables;
+
+    public RpcRouterCodegenInstance(Class<T> type, T routerImpl, Set<Class<? extends BaseIdentity>> contexts,
+            Set<Class<? extends DataContainer>> inputs) {
+        this.listeners = ListenerRegistry.create();
+        this.serviceType = type;
+        this.invocationProxy = routerImpl;
+        this.contexts = ImmutableSet.copyOf(contexts);
+        Map<Class<? extends BaseIdentity>, RpcRoutingTableImpl<? extends BaseIdentity, T>> mutableRoutingTables = new HashMap<>();
+        for (Class<? extends BaseIdentity> ctx : contexts) {
+            RpcRoutingTableImpl<? extends BaseIdentity, T> table = new RpcRoutingTableImpl<>(ctx);
+            Map invokerView = table.getRoutes();
+            setRoutingTable((RpcService) invocationProxy, ctx, invokerView);
+            mutableRoutingTables.put(ctx, table);
+            table.registerRouteChangeListener(this);
+        }
+        this.routingTables = ImmutableMap.copyOf(mutableRoutingTables);
+    }
+
+    @Override
+    public Class<T> getServiceType() {
+        return serviceType;
+    }
+
+    @Override
+    public T getInvocationProxy() {
+        return invocationProxy;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <C extends BaseIdentity> RpcRoutingTable<C, T> getRoutingTable(Class<C> routeContext) {
+        return (RpcRoutingTable<C, T>) routingTables.get(routeContext);
+    }
+
+    @Override
+    public T getDefaultService() {
+        return defaultService;
+    }
+
+    @Override
+    public Set<Class<? extends BaseIdentity>> getContexts() {
+        return contexts;
+    }
+
+    @Override
+    public <L extends RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(
+            L listener) {
+        return listeners.registerWithType(listener);
+    }
+
+    @Override
+    public void onRouteChange(RouteChange<Class<? extends BaseIdentity>, InstanceIdentifier<?>> change) {
+        for (ListenerRegistration<RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>>> listener : listeners) {
+            try {
+                listener.getInstance().onRouteChange(change);
+            } catch (Exception e) {
+                LOG.error("Error occured during invoker listener {}", listener.getInstance(), e);
+            }
+        }
+    }
+
+    @Override
+    public T getService(Class<? extends BaseIdentity> context, InstanceIdentifier<?> path) {
+        return routingTables.get(context).getRoute(path);
+    }
+
+    @Override
+    public RoutedRpcRegistration<T> addRoutedRpcImplementation(T service) {
+        return new RoutedRpcRegistrationImpl(service);
+    }
+
+    @Override
+    public RpcRegistration<T> registerDefaultService(T service) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    private class RoutedRpcRegistrationImpl extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
+
+        public RoutedRpcRegistrationImpl(T instance) {
+            super(instance);
+        }
+
+        @Override
+        public Class<T> getServiceType() {
+            return serviceType;
+        }
+
+        @Override
+        public void registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<?> path) {
+            routingTables.get(context).updateRoute(path, getInstance());
+        }
+
+        @Override
+        public void unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<?> path) {
+            routingTables.get(context).removeRoute(path, getInstance());
+
+        }
+
+        @Override
+        public void registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<?> instance) {
+            registerPath(context, instance);
+        }
+
+        @Override
+        public void unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<?> instance) {
+            unregisterPath(context, instance);
+        }
+
+        @Override
+        protected void removeRegistration() {
+
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRouterCodegenInstance.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRouterCodegenInstance.xtend
deleted file mode 100644 (file)
index b6dcde1..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.opendaylight.controller.sal.binding.codegen.impl
-
-import org.opendaylight.yangtools.yang.binding.RpcService
-import org.opendaylight.controller.sal.binding.spi.RpcRouter
-import org.opendaylight.yangtools.yang.binding.BaseIdentity
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
-import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*
-import java.util.Set
-import java.util.HashMap
-import org.opendaylight.controller.sal.binding.spi.RpcRoutingTable
-import org.opendaylight.yangtools.yang.binding.DataContainer
-import org.opendaylight.yangtools.yang.binding.RpcImplementation
-
-class RpcRouterCodegenInstance<T extends RpcService> implements RpcRouter<T> {
-
-    @Property
-    val T invocationProxy
-
-    @Property
-    val RpcImplementation invokerDelegate;
-
-    @Property
-    val Class<T> serviceType
-
-    @Property
-    val Set<Class<? extends BaseIdentity>> contexts
-
-    @Property
-    val Set<Class<? extends DataContainer>> supportedInputs;
-
-    val routingTables = new HashMap<Class<? extends BaseIdentity>, RpcRoutingTableImpl<? extends BaseIdentity, ? extends RpcService>>;
-
-    @Property
-    var T defaultService
-
-    new(Class<T> type, T routerImpl, Set<Class<? extends BaseIdentity>> contexts,
-        Set<Class<? extends DataContainer>> inputs) {
-        _serviceType = type
-        _invocationProxy = routerImpl
-        _invokerDelegate = routerImpl as RpcImplementation
-        _contexts = contexts
-        _supportedInputs = inputs;
-
-        for (ctx : contexts) {
-            val table = XtendHelper.createRoutingTable(ctx)
-            invocationProxy.setRoutingTable(ctx, table.routes);
-            routingTables.put(ctx, table);
-        }
-    }
-
-    override <C extends BaseIdentity> getRoutingTable(Class<C> table) {
-        routingTables.get(table) as RpcRoutingTable<C,T>
-    }
-
-    override getService(Class<? extends BaseIdentity> context, InstanceIdentifier<?> path) {
-        val table = getRoutingTable(context);
-        return table.getRoute(path);
-    }
-
-    override <T extends DataContainer> invoke(Class<T> type, T input) {
-        return invokerDelegate.invoke(type, input);
-    }
-
-}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRoutingTableImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRoutingTableImpl.java
new file mode 100644 (file)
index 0000000..f959235
--- /dev/null
@@ -0,0 +1,125 @@
+package org.opendaylight.controller.sal.binding.codegen.impl;
+
+import org.opendaylight.controller.sal.binding.spi.RpcRoutingTable;
+import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Mutable;
+
+class RpcRoutingTableImpl<C extends BaseIdentity, S extends RpcService> //
+implements //
+        Mutable, //
+        RpcRoutingTable<C, S>, //
+        RouteChangePublisher<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
+
+    private final Class<C> identifier;
+    private final ConcurrentMap<InstanceIdentifier<?>, S> routes;
+    private final Map<InstanceIdentifier<?>, S> unmodifiableRoutes;
+
+    private RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> listener;
+    private S defaultRoute;
+
+    public RpcRoutingTableImpl(Class<C> identifier) {
+        super();
+        this.identifier = identifier;
+        this.routes = new ConcurrentHashMap<>();
+        this.unmodifiableRoutes = Collections.unmodifiableMap(routes);
+    }
+
+    @Override
+    public void setDefaultRoute(S target) {
+        defaultRoute = target;
+    }
+
+    @Override
+    public S getDefaultRoute() {
+        return defaultRoute;
+    }
+
+    @Override
+    public <L extends RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(
+            L listener) {
+        return (ListenerRegistration<L>) new SingletonListenerRegistration<L>(listener);
+    }
+        
+    @Override
+    public Class<C> getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void updateRoute(InstanceIdentifier<?> path, S service) {
+        S previous = this.routes.put(path, service);
+        @SuppressWarnings("rawtypes")
+        RouteChangeListener listenerCapture = listener;
+        if (previous == null && listenerCapture != null) {
+            listenerCapture.onRouteChange(RoutingUtils.announcementChange(identifier, path));
+        }
+    }
+
+    
+    @Override
+    @SuppressWarnings("unchecked")
+    public void removeRoute(InstanceIdentifier<?> path) {
+        S previous = this.routes.remove(path);
+        @SuppressWarnings("rawtypes")
+        RouteChangeListener listenerCapture = listener;
+        if (previous != null && listenerCapture != null) {
+            listenerCapture.onRouteChange(RoutingUtils.removalChange(identifier, path));
+        }
+    }
+    
+    public void removeRoute(InstanceIdentifier<?> path, S service) {
+        @SuppressWarnings("rawtypes")
+        RouteChangeListener listenerCapture = listener;
+        if (routes.remove(path, service) && listenerCapture != null) {
+            listenerCapture.onRouteChange(RoutingUtils.removalChange(identifier, path));
+        }
+    }
+
+    @Override
+    public S getRoute(InstanceIdentifier<?> nodeInstance) {
+        S route = routes.get(nodeInstance);
+        if (route != null) {
+            return route;
+        }
+        return getDefaultRoute();
+    }
+
+    @Override
+    public Map<InstanceIdentifier<?>, S> getRoutes() {
+        return unmodifiableRoutes;
+    }
+    
+    protected void removeAllReferences(S service) {
+        
+    }
+
+    private class SingletonListenerRegistration<L extends RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>>> extends
+            AbstractObjectRegistration<L>
+            implements ListenerRegistration<L> {
+
+        public SingletonListenerRegistration(L instance) {
+            super(instance);
+            listener = instance;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            listener = null;
+        }
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRoutingTableImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/RpcRoutingTableImpl.xtend
deleted file mode 100644 (file)
index 116a817..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.opendaylight.controller.sal.binding.codegen.impl
-
-import org.opendaylight.controller.sal.binding.spi.RpcRoutingTable
-import org.opendaylight.yangtools.yang.binding.BaseIdentity
-import org.opendaylight.yangtools.yang.binding.RpcService
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
-import java.util.Map
-import org.opendaylight.yangtools.yang.binding.DataObject
-import java.util.HashMap
-
-class RpcRoutingTableImpl<C extends BaseIdentity,S extends RpcService> implements RpcRoutingTable<C,S>{
-    
-    @Property
-    val Class<C> identifier;
-    
-    @Property
-    var S defaultRoute;
-    
-    @Property
-    val Map<InstanceIdentifier<? extends DataObject>,S> routes;
-    
-    new(Class<C> ident, Map<InstanceIdentifier<? extends DataObject>,S> route) {
-        _identifier = ident
-        _routes = route
-    }
-    
-    new(Class<C> ident) {
-        _identifier = ident
-        _routes = new HashMap
-    }
-    
-    
-    override getRoute(InstanceIdentifier<? extends Object> nodeInstance) {
-        val ret = routes.get(nodeInstance);
-        if(ret !== null) {
-            return ret;
-        }
-        return defaultRoute;
-    }
-    
-    override removeRoute(InstanceIdentifier<? extends Object> path) {
-        routes.remove(path);
-    }
-    
-    @SuppressWarnings("rawtypes")
-    override updateRoute(InstanceIdentifier<? extends Object> path, S service) {
-        routes.put(path as InstanceIdentifier<? extends DataObject>,service);
-    }
-}
\ No newline at end of file
index 90fcbd9..7ebcf02 100644 (file)
@@ -27,7 +27,7 @@ import org.opendaylight.yangtools.yang.binding.Notification
 import static extension org.opendaylight.controller.sal.binding.codegen.YangtoolsMappingHelper.*
 import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeSpecification.*
 import java.util.HashSet
-import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.*
+import static org.opendaylight.yangtools.concepts.util.ClassLoaderUtils.*
 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory
 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
 import java.util.Set
@@ -37,6 +37,8 @@ import org.opendaylight.yangtools.yang.binding.annotations.QName
 import org.opendaylight.yangtools.yang.binding.DataContainer
 import org.opendaylight.yangtools.yang.binding.RpcImplementation
 import org.opendaylight.controller.sal.binding.codegen.util.JavassistUtils
+import org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils
+import javassist.LoaderClassPath
 
 class RuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator, NotificationInvokerFactory {
 
@@ -45,40 +47,70 @@ class RuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.co
     val extension JavassistUtils utils;
     val Map<Class<? extends NotificationListener>, RuntimeGeneratedInvokerPrototype> invokerClasses;
 
-    public new(ClassPool pool) {
+
+    new(ClassPool pool) {
         classPool = pool;
         utils = new JavassistUtils(pool);
         invokerClasses = new WeakHashMap();
         BROKER_NOTIFICATION_LISTENER = org.opendaylight.controller.sal.binding.api.NotificationListener.asCtClass;
+        pool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
     }
 
     override <T extends RpcService> getDirectProxyFor(Class<T> iface) {
-        val supertype = iface.asCtClass
-        val targetCls = createClass(iface.directProxyName, supertype) [
-            field(DELEGATE_FIELD, iface);
-            implementMethodsFrom(supertype) [
-                body = '''
-                {
-                    if(«DELEGATE_FIELD» == null) {
+        val T instance =  withClassLoaderAndLock(iface.classLoader,lock) [|
+            val proxyName = iface.directProxyName;
+            val potentialClass = ClassLoaderUtils.tryToLoadClassWithTCCL(proxyName)
+            if(potentialClass != null) {
+                return potentialClass.newInstance as T;
+            }
+            val supertype = iface.asCtClass
+            val createdCls = createClass(iface.directProxyName, supertype) [
+                field(DELEGATE_FIELD, iface);
+                implementsType(RpcImplementation.asCtClass)
+                implementMethodsFrom(supertype) [
+                    body = '''
+                    {
+                        if(«DELEGATE_FIELD» == null) {
+                            throw new java.lang.IllegalStateException("No provider is processing supplied message");
+                        }
+                        return ($r) «DELEGATE_FIELD».«it.name»($$);
+                    }
+                    '''
+                ]
+                implementMethodsFrom(RpcImplementation.asCtClass) [
+                    body = '''
+                    {
                         throw new java.lang.IllegalStateException("No provider is processing supplied message");
+                        return ($r) null;
                     }
-                    return ($r) «DELEGATE_FIELD».«it.name»($$);
-                }
-                '''
+                    '''
+                ]
             ]
+            return createdCls.toClass(iface.classLoader).newInstance as T
         ]
-        return targetCls.toClass(iface.classLoader).newInstance as T
+        return instance;
     }
 
     override <T extends RpcService> getRouterFor(Class<T> iface) {
-        val instance = <RpcRouterCodegenInstance<T>>withClassLoaderAndLock(iface.classLoader,lock) [ |
+        val metadata = withClassLoader(iface.classLoader) [|
+            val supertype = iface.asCtClass
+            return supertype.rpcMetadata;
+        ]
+        
+        val instance = <T>withClassLoaderAndLock(iface.classLoader,lock) [ |
             val supertype = iface.asCtClass
-            val metadata = supertype.rpcMetadata;
+            val routerName = iface.routerName;
+            val potentialClass = ClassLoaderUtils.tryToLoadClassWithTCCL(routerName)
+            if(potentialClass != null) {
+                return potentialClass.newInstance as T;
+            }
+            
             val targetCls = createClass(iface.routerName, supertype) [
-                addInterface(RpcImplementation.asCtClass)
+                
                 
                 field(DELEGATE_FIELD, iface)
                 //field(REMOTE_INVOKER_FIELD,iface);
+                implementsType(RpcImplementation.asCtClass)
                 
                 for (ctx : metadata.contexts) {
                     field(ctx.routingTableField, Map)
@@ -105,35 +137,18 @@ class RuntimeCodeGenerator implements org.opendaylight.controller.sal.binding.co
                     }
                 ]
                 implementMethodsFrom(RpcImplementation.asCtClass) [
-                    switch (name) {
-                        case "getSupportedInputs":
-                            body = '''
-                            {
-                                throw new java.lang.UnsupportedOperationException("Not implemented yet");
-                                return ($r) null;
-                            }'''
-                        case "invoke": {
-                            val tmpBody = '''
-                            {
-                                «FOR input : metadata.supportedInputs SEPARATOR " else "»
-                                «val rpcMetadata = metadata.rpcInputs.get(input)»
-                                if(«input.name».class.equals($1)) {
-                                    return ($r) this.«rpcMetadata.methodName»((«input.name») $2);
-                                }
-                                «ENDFOR»
-                                throw new java.lang.IllegalArgumentException("Not supported message type");
-                                return ($r) null;
-                            }
-                            '''
-                            body = tmpBody
-                        }
+                    body = '''
+                    {
+                        throw new java.lang.IllegalStateException("No provider is processing supplied message");
+                        return ($r) null;
                     }
+                    '''
                 ]
             ]
-            val instance = targetCls.toClass(iface.classLoader,iface.protectionDomain).newInstance as T
-            return new RpcRouterCodegenInstance(iface, instance, metadata.contexts,metadata.supportedInputs);
+            return targetCls.toClass(iface.classLoader,iface.protectionDomain).newInstance as T
+            
         ];
-        return instance;
+        return new RpcRouterCodegenInstance(iface, instance, metadata.contexts,metadata.supportedInputs);
     }
 
     private def RpcServiceMetadata getRpcMetadata(CtClass iface) {
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java
new file mode 100644 (file)
index 0000000..266293f
--- /dev/null
@@ -0,0 +1,14 @@
+package org.opendaylight.controller.sal.binding.codegen.impl;
+
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
+
+import javassist.ClassPool;
+
+public class SingletonHolder {
+
+    public static final ClassPool CLASS_POOL = new ClassPool(); 
+    public static final org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator RPC_GENERATOR_IMPL = new org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator(CLASS_POOL);
+    public static final RuntimeCodeGenerator RPC_GENERATOR = RPC_GENERATOR_IMPL;
+    public static final NotificationInvokerFactory INVOKER_FACTORY = RPC_GENERATOR_IMPL.getInvokerFactory();
+}
index 4b672f1..f1ba584 100644 (file)
@@ -175,7 +175,7 @@ public class LazyGeneratedCodecRegistry implements //
                     });
             return ret;
         } catch (Exception e) {
-            LOG.error("Could not find augmentable for {}", augmentation, e);
+            LOG.debug("Could not find augmentable for {} using {}", augmentation, augmentation.getClassLoader(), e);
             return null;
         }
     }
index 853e62a..1b3acf7 100644 (file)
@@ -43,6 +43,11 @@ import java.util.ArrayList
 import org.opendaylight.yangtools.yang.data.api.Node
 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl
 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl
+import org.opendaylight.yangtools.yang.binding.RpcService
+import java.util.Set
+import org.opendaylight.yangtools.yang.common.QName
+import com.google.common.collect.FluentIterable
+import org.opendaylight.yangtools.binding.generator.util.BindingGeneratorUtil
 
 class RuntimeGeneratedMappingServiceImpl implements BindingIndependentMappingService, SchemaServiceListener, AutoCloseable {
 
@@ -65,6 +70,9 @@ class RuntimeGeneratedMappingServiceImpl implements BindingIndependentMappingSer
 
     @Property
     val ConcurrentMap<Type, SchemaNode> typeToSchemaNode = new ConcurrentHashMap();
+    
+    @Property
+    val ConcurrentMap<Type,Set<QName>> serviceTypeToRpc = new ConcurrentHashMap(); 
 
     val promisedTypeDefinitions = HashMultimap.<Type, SettableFuture<GeneratedTypeBuilder>>create;
 
@@ -85,10 +93,17 @@ class RuntimeGeneratedMappingServiceImpl implements BindingIndependentMappingSer
 
             registry.onModuleContextAdded(schemaContext, entry.key, entry.value);
             binding.pathToType.putAll(entry.value.childNodes)
-            //val module = entry.key;
+            val module = entry.key;
             val context = entry.value;
             updateBindingFor(context.childNodes, schemaContext);
             updateBindingFor(context.cases, schemaContext);
+            val namespace = BindingGeneratorUtil.moduleNamespaceToPackageName(module);
+            
+            if(!module.rpcs.empty) {
+            val rpcs = FluentIterable.from(module.rpcs).transform[QName].toSet
+            val serviceClass = new ReferencedTypeImpl(namespace,BindingGeneratorUtil.parseToClassName(module.name)+"Service");
+                serviceTypeToRpc.put(serviceClass,rpcs);
+            }
 
             val typedefs = context.typedefs;
             for (typedef : typedefs.entrySet) {
@@ -186,15 +201,7 @@ class RuntimeGeneratedMappingServiceImpl implements BindingIndependentMappingSer
     }
 
     override dataObjectFromDataDom(InstanceIdentifier<? extends DataObject> path, CompositeNode node) {
-        return tryDeserialization[ |
-            if (node == null) {
-                return null;
-            }
-            val targetType = path.targetType
-            val transformer = registry.getCodecForDataObject(targetType);
-            val ret = transformer.deserialize(node)?.value as DataObject;
-            return ret;
-        ]
+         dataObjectFromDataDom(path.targetType,node) as DataObject;
     }
 
     override fromDataDom(org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry) {
@@ -243,6 +250,10 @@ class RuntimeGeneratedMappingServiceImpl implements BindingIndependentMappingSer
             listenerRegistration = ctx.registerService(SchemaServiceListener, this, new Hashtable<String, String>());
         }
     }
+    
+    override getRpcQNamesFor(Class<? extends RpcService> service) {
+        return serviceTypeToRpc.get(new ReferencedTypeImpl(service.package.name,service.simpleName));
+    }
 
     private def getSchemaWithRetry(Type type) {
         val typeDef = typeToSchemaNode.get(type);
@@ -274,5 +285,16 @@ class RuntimeGeneratedMappingServiceImpl implements BindingIndependentMappingSer
     override close() throws Exception {
         listenerRegistration?.unregister();
     }
+    
+    override dataObjectFromDataDom(Class<? extends DataContainer> container, CompositeNode domData) {
+        return tryDeserialization[ |
+            if (domData == null) {
+                return null;
+            }
+            val transformer = registry.getCodecForDataObject(container);
+            val ret = transformer.deserialize(domData)?.value as DataObject;
+            return ret;
+        ]
+    }
 
 }
index 9381a5a..8d3545f 100644 (file)
@@ -13,7 +13,6 @@ import org.opendaylight.yangtools.yang.binding.RpcService
 import javassist.ClassPool
 import org.osgi.framework.BundleContext
 import java.util.Map
-import java.util.HashMap
 import javassist.LoaderClassPath
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker
 import java.util.Hashtable
@@ -49,29 +48,14 @@ import java.util.concurrent.Callable
 import java.util.WeakHashMap
 import javax.annotation.concurrent.GuardedBy
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry
+import org.opendaylight.yangtools.concepts.ListenerRegistration
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry
 
-class BindingAwareBrokerImpl implements BindingAwareBroker, RpcProviderRegistry, AutoCloseable {
+class BindingAwareBrokerImpl extends RpcProviderRegistryImpl implements BindingAwareBroker, AutoCloseable {
     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
 
     private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
 
-    private static val clsPool = ClassPool.getDefault()
-    public static var RuntimeCodeGenerator generator;
-
-    /**
-     * Map of all Managed Direct Proxies
-     * 
-     */
-    private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
-
-    /**
-     * 
-     * Map of all available Rpc Routers
-     * 
-     * 
-     */
-    private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new WeakHashMap();
-
     @Property
     private var NotificationProviderService notifyBroker
 
@@ -81,43 +65,15 @@ class BindingAwareBrokerImpl implements BindingAwareBroker, RpcProviderRegistry,
     @Property
     var BundleContext brokerBundleContext
 
-    ServiceRegistration<NotificationProviderService> notifyProviderRegistration
-
-    ServiceRegistration<NotificationService> notifyConsumerRegistration
-
-    ServiceRegistration<DataProviderService> dataProviderRegistration
-
-    ServiceRegistration<DataBrokerService> dataConsumerRegistration
-
-    private val proxyGenerationLock = new ReentrantLock;
-
-    private val routerGenerationLock = new ReentrantLock;
-
     public new(BundleContext bundleContext) {
         _brokerBundleContext = bundleContext;
     }
 
     def start() {
         log.info("Starting MD-SAL: Binding Aware Broker");
-        initGenerator();
-
-        val executor = Executors.newCachedThreadPool;
-
-        // Initialization of notificationBroker
-        log.info("Starting MD-SAL: Binding Aware Notification Broker");
-
-        log.info("Starting MD-SAL: Binding Aware Data Broker");
-
-        log.info("Starting MD-SAL: Binding Aware Data Broker");
-        log.info("MD-SAL: Binding Aware Broker Started");
     }
 
-    def initGenerator() {
 
-        // YANG Binding Class Loader
-        clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
-        generator = new RuntimeCodeGenerator(clsPool);
-    }
 
     override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) {
         val ctx = consumer.createContext(bundleCtx)
@@ -139,247 +95,9 @@ class BindingAwareBrokerImpl implements BindingAwareBroker, RpcProviderRegistry,
     private def createContext(BindingAwareProvider provider, BundleContext providerCtx) {
         new OsgiProviderContext(providerCtx, this)
     }
-
-    /**
-     * Returns a Managed Direct Proxy for supplied class
-     * 
-     * Managed direct proxy is a generated proxy class conforming to the supplied interface
-     * which delegates all calls to the backing delegate.
-     * 
-     * Proxy does not do any validation, null pointer checks or modifies data in any way, it
-     * is only use to avoid exposing direct references to backing implementation of service.
-     * 
-     * If proxy class does not exist for supplied service class it will be generated automatically.
-     */
-    private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
-        var RpcProxyContext existing = null
-
-        if ((existing = managedProxies.get(service)) != null) {
-            return existing.proxy
-        }
-        return withLock(proxyGenerationLock) [ |
-            val maybeProxy = managedProxies.get(service);
-            if (maybeProxy !== null) {
-                return maybeProxy.proxy;
-            }
-            
-            
-            val proxyInstance = generator.getDirectProxyFor(service)
-            val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
-            val properties = new Hashtable<String, String>()
-            rpcProxyCtx.proxy = proxyInstance as RpcService
-            properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
-            rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
-            managedProxies.put(service, rpcProxyCtx)
-            return rpcProxyCtx.proxy
-        ]
-    }
-
-    private static def <T> T withLock(ReentrantLock lock, Callable<T> method) {
-        try {
-            lock.lock();
-            val ret = method.call;
-            return ret;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Registers RPC Implementation
-     * 
-     */
-    override <T extends RpcService> addRpcImplementation(Class<T> type, T service) {
-        checkNotNull(type, "Service type should not be null")
-        checkNotNull(service, "Service type should not be null")
-        
-        val proxy = getManagedDirectProxy(type)
-        checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
-
-        proxy.delegate = service;
-        return new RpcServiceRegistrationImpl<T>(type, service, this);
-    }
-
-    override <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type, T service) {
-        checkNotNull(type, "Service type should not be null")
-        checkNotNull(service, "Service type should not be null")
-        
-        val router = resolveRpcRouter(type);
-        checkState(router !== null)
-        return new RoutedRpcRegistrationImpl<T>(service, router, this)
-    }
     
-    override <T extends RpcService> getRpcService(Class<T> service) {
-        checkNotNull(service, "Service should not be null");
-        return getManagedDirectProxy(service) as T;
-    }
-
-    private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
-
-        val router = rpcRouters.get(type);
-        if (router !== null) {
-            return router as RpcRouter<T>;
-        }
-
-        // We created Router
-        return withLock(routerGenerationLock) [ |
-            val maybeRouter = rpcRouters.get(type);
-            if (maybeRouter !== null) {
-                return maybeRouter as RpcRouter<T>;
-            }
-            
-            val newRouter = generator.getRouterFor(type);
-            checkState(newRouter !== null);
-            rpcRouters.put(type, newRouter);
-            // We create / update Direct Proxy for router
-            val proxy = getManagedDirectProxy(type);
-            proxy.delegate = newRouter.invocationProxy
-            return newRouter;
-        ]
-
-    }
-
-    protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
-        Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
-
-        val router = registration.router;
-        val paths = registration.registeredPaths;
-
-        val routingTable = router.getRoutingTable(context)
-        checkState(routingTable != null);
-
-        // Updating internal structure of registration
-        routingTable.updateRoute(path, registration.instance)
-
-        // Update routing table / send announce to message bus
-        val success = paths.put(context, path);
-    }
-
-    protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
-        Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
-
-        val router = registration.router;
-        val paths = registration.registeredPaths;
-
-        val routingTable = router.getRoutingTable(context)
-        checkState(routingTable != null);
-
-        // Updating internal structure of registration
-        val target = routingTable.getRoute(path)
-        checkState(target === registration.instance)
-        routingTable.removeRoute(path)
-        checkState(paths.remove(context, path));
-    }
-
-    protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
-
-        val router = registration.router;
-        val paths = registration.registeredPaths;
-
-        for (ctxMap : registration.registeredPaths.entries) {
-            val context = ctxMap.key
-            val routingTable = router.getRoutingTable(context)
-            val path = ctxMap.value
-            routingTable.removeRoute(path)
-        }
-    }
-
-    protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
-
-        val type = registration.serviceType;
-
-        val proxy = managedProxies.get(type);
-        if (proxy.proxy.delegate === registration.instance) {
-            proxy.proxy.delegate = null;
-        }
-    }
-
-    def createDelegate(Class<? extends RpcService> type) {
-        getManagedDirectProxy(type);
-    }
-
-    def getRpcRouters() {
-        return Collections.unmodifiableMap(rpcRouters);
-    }
-
-    override close() {
-        dataConsumerRegistration.unregister()
-        dataProviderRegistration.unregister()
-        notifyConsumerRegistration.unregister()
-        notifyProviderRegistration.unregister()
-    }
-
-}
-
-class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
-
-    @Property
-    private val BindingAwareBrokerImpl broker;
-
-    @Property
-    private val RpcRouter<T> router;
-
-    @Property
-    private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
-
-    private var closed = false;
-
-    new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
-        super(instance)
-        _router = backingRouter;
-        _broker = broker;
-    }
-
-    override protected removeRegistration() {
-        closed = true
-        broker.unregisterRoutedRpcService(this)
-    }
-
-    override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
-        registerPath(context, instance);
-    }
-
-    override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
-        unregisterPath(context, instance);
-    }
-
-    override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
-        checkClosed()
-        broker.registerPath(this, context, path);
-    }
-
-    override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
-        checkClosed()
-        broker.unregisterPath(this, context, path);
-    }
-
-    override getServiceType() {
-        return router.serviceType;
-    }
-
-    private def checkClosed() {
-        if (closed)
-            throw new IllegalStateException("Registration was closed.");
-    }
-
-}
-
-class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
-
-    private var BindingAwareBrokerImpl broker;
-
-    @Property
-    val Class<T> serviceType;
-
-    public new(Class<T> type, T service, BindingAwareBrokerImpl broker) {
-        super(service);
-        this._serviceType = type;
-        this.broker = broker;
-    }
-
-    override protected removeRegistration() {
-        broker.unregisterRpcService(this);
-        broker = null;
+    override close() throws Exception {
+        
     }
 
-}
+}
\ No newline at end of file
index e8b3850..b10c06f 100644 (file)
@@ -20,8 +20,8 @@ import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
 import org.opendaylight.yangtools.concepts.ListenerRegistration\r
 import org.opendaylight.yangtools.concepts.Registration\r
 import org.opendaylight.yangtools.yang.binding.Notification\r
-import org.slf4j.LoggerFactory\r
-\r
+import org.slf4j.LoggerFactory\rimport org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder
+
 class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {\r
 \r
     val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;\r
@@ -29,6 +29,11 @@ class NotificationBrokerImpl implements NotificationProviderService, AutoCloseab
     @Property\r
     var ExecutorService executor;\r
 \r
+    new() {\r
+        listeners = HashMultimap.create()\r
+    }\r
+\r
+    @Deprecated\r
     new(ExecutorService executor) {\r
         listeners = HashMultimap.create()\r
         this.executor = executor;\r
@@ -100,7 +105,7 @@ class NotificationBrokerImpl implements NotificationProviderService, AutoCloseab
 \r
     override registerNotificationListener(\r
         org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
-        val invoker = BindingAwareBrokerImpl.generator.invokerFactory.invokerFor(listener);\r
+        val invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);\r
         for (notifyType : invoker.supportedNotifications) {\r
             listeners.put(notifyType, invoker.invocationProxy)\r
         }\r
index bc53108..644c50b 100644 (file)
@@ -45,8 +45,7 @@ class OsgiConsumerContext implements ConsumerContext {
                 val ref = services.iterator().next() as ServiceReference<T>;
                 return bundleContext.getService(ref) as T;
             } else {
-                broker.createDelegate(module);
-                return getRpcService(module);
+                return broker.getRpcService(module);
             }
         } catch (InvalidSyntaxException e) {
             log.error("Created filter was invalid:", e.message, e)
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/RpcProviderRegistryImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/RpcProviderRegistryImpl.java
new file mode 100644 (file)
index 0000000..bc86288
--- /dev/null
@@ -0,0 +1,166 @@
+package org.opendaylight.controller.sal.binding.impl;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.WeakHashMap;
+
+import javax.swing.tree.ExpandVetoException;
+
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
+import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper;
+import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
+import org.opendaylight.controller.sal.binding.spi.RpcContextIdentifier;
+import org.opendaylight.controller.sal.binding.spi.RpcRouter;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+
+import static com.google.common.base.Preconditions.*;
+
+public class RpcProviderRegistryImpl implements //
+        RpcProviderRegistry, //
+        RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
+
+    private RuntimeCodeGenerator rpcFactory = SingletonHolder.RPC_GENERATOR_IMPL;
+
+    private final Map<Class<? extends RpcService>, RpcService> publicProxies = new WeakHashMap<>();
+    private final Map<Class<? extends RpcService>, RpcRouter<?>> rpcRouters = new WeakHashMap<>();
+    private final ListenerRegistry<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> routeChangeListeners = ListenerRegistry
+            .create();
+
+    @Override
+    public final <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type,
+            T implementation) throws IllegalStateException {
+        return getRpcRouter(type).addRoutedRpcImplementation(implementation);
+    }
+
+    @Override
+    public final <T extends RpcService> RpcRegistration<T> addRpcImplementation(Class<T> type, T implementation)
+            throws IllegalStateException {
+        RpcRouter<T> potentialRouter = (RpcRouter<T>) rpcRouters.get(type);
+        if (potentialRouter != null) {
+            checkState(potentialRouter.getDefaultService() == null,
+                    "Default service for routed RPC already registered.");
+            return potentialRouter.registerDefaultService(implementation);
+        }
+        T publicProxy = getRpcService(type);
+        RpcService currentDelegate = RuntimeCodeHelper.getDelegate(publicProxy);
+        checkState(currentDelegate == null, "Rpc service is already registered");
+        RuntimeCodeHelper.setDelegate(publicProxy, implementation);
+        return new RpcProxyRegistration<T>(type, implementation, this);
+    }
+
+    @Override
+    public final <T extends RpcService> T getRpcService(Class<T> type) {
+
+        RpcService potentialProxy = publicProxies.get(type);
+        if (potentialProxy != null) {
+            return (T) potentialProxy;
+        }
+        T proxy = rpcFactory.getDirectProxyFor(type);
+        publicProxies.put(type, proxy);
+        return proxy;
+    }
+
+    private <T extends RpcService> RpcRouter<T> getRpcRouter(Class<T> type) {
+        RpcRouter<?> potentialRouter = rpcRouters.get(type);
+        if (potentialRouter != null) {
+            return (RpcRouter<T>) potentialRouter;
+        }
+        RpcRouter<T> router = rpcFactory.getRouterFor(type);
+        router.registerRouteChangeListener(new RouteChangeForwarder(type));
+        RuntimeCodeHelper.setDelegate(getRpcService(type), router.getInvocationProxy());
+        rpcRouters.put(type, router);
+        return router;
+    }
+
+    public <L extends RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(
+            L listener) {
+        return (ListenerRegistration<L>) routeChangeListeners.register(listener);
+    }
+
+    public RuntimeCodeGenerator getRpcFactory() {
+        return rpcFactory;
+    }
+
+    public void setRpcFactory(RuntimeCodeGenerator rpcFactory) {
+        this.rpcFactory = rpcFactory;
+    }
+
+    private class RouteChangeForwarder<T extends RpcService> implements
+            RouteChangeListener<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
+
+        private final Class<T> type;
+
+        public RouteChangeForwarder(Class<T> type) {
+            this.type = type;
+        }
+
+        @Override
+        public void onRouteChange(RouteChange<Class<? extends BaseIdentity>, InstanceIdentifier<?>> change) {
+            Map<RpcContextIdentifier, Set<InstanceIdentifier<?>>> announcements = new HashMap<>();
+            for (Entry<Class<? extends BaseIdentity>, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements()
+                    .entrySet()) {
+                RpcContextIdentifier key = RpcContextIdentifier.contextFor(type, entry.getKey());
+                announcements.put(key, entry.getValue());
+            }
+            Map<RpcContextIdentifier, Set<InstanceIdentifier<?>>> removals = new HashMap<>();
+            for (Entry<Class<? extends BaseIdentity>, Set<InstanceIdentifier<?>>> entry : change.getRemovals()
+                    .entrySet()) {
+                RpcContextIdentifier key = RpcContextIdentifier.contextFor(type, entry.getKey());
+                removals.put(key, entry.getValue());
+            }
+            RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> toPublish = RoutingUtils
+                    .<RpcContextIdentifier, InstanceIdentifier<?>> change(announcements, removals);
+            for (ListenerRegistration<RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> listener : routeChangeListeners) {
+                try {
+                    listener.getInstance().onRouteChange(toPublish);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    public static class RpcProxyRegistration<T extends RpcService> extends AbstractObjectRegistration<T> implements
+            RpcRegistration<T> {
+
+        private final Class<T> serviceType;
+        private RpcProviderRegistryImpl registry;
+
+        public RpcProxyRegistration(Class<T> type, T service, RpcProviderRegistryImpl registry) {
+            super(service);
+            serviceType = type;
+        }
+
+        @Override
+        public Class<T> getServiceType() {
+            return serviceType;
+        }
+
+        @Override
+        protected void removeRegistration() {
+            if (registry != null) {
+                T publicProxy = registry.getRpcService(serviceType);
+                RpcService currentDelegate = RuntimeCodeHelper.getDelegate(publicProxy);
+                if (currentDelegate == getInstance()) {
+                    RuntimeCodeHelper.setDelegate(publicProxy, null);
+                }
+                registry = null;
+            }
+        }
+    }
+}
@@ -1,42 +1,78 @@
 package org.opendaylight.controller.sal.binding.impl.connect.dom;
 
+import java.lang.ref.WeakReference;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+
 import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
+import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
+import org.opendaylight.controller.sal.binding.spi.RpcContextIdentifier;
+import org.opendaylight.controller.sal.binding.spi.RpcRouter;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils;
 import org.opendaylight.yangtools.yang.binding.Augmentable;
 import org.opendaylight.yangtools.yang.binding.Augmentation;
+import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yangtools.yang.binding.BindingMapping;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.RpcInput;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BindingIndependentDataServiceConnector implements //
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+
+import static com.google.common.base.Preconditions.*;
+import static org.opendaylight.yangtools.concepts.util.ClassLoaderUtils.*;
+
+public class BindingIndependentConnector implements //
         RuntimeDataProvider, //
-        Provider, AutoCloseable {
+        Provider, //
+        AutoCloseable {
 
-    private final Logger LOG = LoggerFactory.getLogger(BindingIndependentDataServiceConnector.class);
+    private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class);
 
     private static final InstanceIdentifier<? extends DataObject> ROOT = InstanceIdentifier.builder().toInstance();
 
@@ -59,26 +95,41 @@ public class BindingIndependentDataServiceConnector implements //
 
     private Registration<DataCommitHandler<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode>> biCommitHandlerRegistration;
 
+    private RpcProvisionRegistry biRpcRegistry;
+    private RpcProviderRegistryImpl baRpcRegistry;
+
+    private ListenerRegistration<DomToBindingRpcForwardingManager> domToBindingRpcManager;
+    // private ListenerRegistration<BindingToDomRpcForwardingManager>
+    // bindingToDomRpcManager;
+
+    private Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
+
+        @Override
+        public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier<?> input) {
+            return mappingService.toDataDom(input);
+        }
+
+    };
+
     @Override
     public DataObject readOperationalData(InstanceIdentifier<? extends DataObject> path) {
         try {
             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier biPath = mappingService.toDataDom(path);
-            
-            
+
             CompositeNode result = biDataService.readOperationalData(biPath);
             Class<? extends DataObject> targetType = path.getTargetType();
-            
-            if(Augmentation.class.isAssignableFrom(targetType)) {
+
+            if (Augmentation.class.isAssignableFrom(targetType)) {
                 path = mappingService.fromDataDom(biPath);
                 Class<? extends Augmentation<?>> augmentType = (Class<? extends Augmentation<?>>) targetType;
                 DataObject parentTo = mappingService.dataObjectFromDataDom(path, result);
-                if(parentTo instanceof Augmentable<?>) {
+                if (parentTo instanceof Augmentable<?>) {
                     return (DataObject) ((Augmentable) parentTo).getAugmentation(augmentType);
                 }
-                
+
             }
             return mappingService.dataObjectFromDataDom(path, result);
-            
+
         } catch (DeserializationException e) {
             throw new IllegalStateException(e);
         }
@@ -183,11 +234,24 @@ public class BindingIndependentDataServiceConnector implements //
         this.baDataService = baDataService;
     }
 
+    public RpcProviderRegistry getRpcRegistry() {
+        return baRpcRegistry;
+    }
+
+    public void setRpcRegistry(RpcProviderRegistryImpl rpcRegistry) {
+        this.baRpcRegistry = rpcRegistry;
+    }
+
     public void start() {
         baDataService.registerDataReader(ROOT, this);
         baCommitHandlerRegistration = baDataService.registerCommitHandler(ROOT, bindingToDomCommitHandler);
         biCommitHandlerRegistration = biDataService.registerCommitHandler(ROOT_BI, domToBindingCommitHandler);
         baDataService.registerCommitHandlerListener(domToBindingCommitHandler);
+
+        if (baRpcRegistry != null && biRpcRegistry != null) {
+            domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager());
+
+        }
     }
 
     public void setMappingService(BindingIndependentMappingService mappingService) {
@@ -205,6 +269,14 @@ public class BindingIndependentDataServiceConnector implements //
         start();
     }
 
+    public <T extends RpcService> void onRpcRouterCreated(Class<T> serviceType, RpcRouter<T> router) {
+
+    }
+
+    public void setDomRpcRegistry(RpcProvisionRegistry registry) {
+        biRpcRegistry = registry;
+    }
+
     @Override
     public void close() throws Exception {
         if (baCommitHandlerRegistration != null) {
@@ -358,4 +430,204 @@ public class BindingIndependentDataServiceConnector implements //
             return forwardedTransaction;
         }
     }
+
+    private class DomToBindingRpcForwardingManager implements
+            RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>> {
+
+        private final Map<Class<? extends RpcService>, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>();
+
+        @Override
+        public void onRouteChange(RouteChange<RpcContextIdentifier, InstanceIdentifier<?>> change) {
+            for (Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry : change.getAnnouncements().entrySet()) {
+                bindingRoutesAdded(entry);
+            }
+        }
+
+        private void bindingRoutesAdded(Entry<RpcContextIdentifier, Set<InstanceIdentifier<?>>> entry) {
+            Class<? extends BaseIdentity> context = entry.getKey().getRoutingContext();
+            Class<? extends RpcService> service = entry.getKey().getRpcService();
+            if (context != null) {
+                getRpcForwarder(service, context).registerPaths(context, service, entry.getValue());
+            }
+        }
+
+        private DomToBindingRpcForwarder getRpcForwarder(Class<? extends RpcService> service,
+                Class<? extends BaseIdentity> context) {
+            DomToBindingRpcForwarder potential = forwarders.get(service);
+            if (potential != null) {
+                return potential;
+            }
+            if (context == null) {
+                potential = new DomToBindingRpcForwarder(service);
+            } else {
+                potential = new DomToBindingRpcForwarder(service, context);
+            }
+            forwarders.put(service, potential);
+            return potential;
+        }
+
+    }
+
+    private class DomToBindingRpcForwarder implements RpcImplementation {
+
+        private final Set<QName> supportedRpcs;
+        private final WeakReference<Class<? extends RpcService>> rpcServiceType;
+        private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+
+        public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
+            this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
+            this.supportedRpcs = mappingService.getRpcQNamesFor(service);
+            for (QName rpc : supportedRpcs) {
+                biRpcRegistry.addRpcImplementation(rpc, this);
+            }
+            registrations = ImmutableSet.of();
+        }
+
+        public DomToBindingRpcForwarder(Class<? extends RpcService> service, Class<? extends BaseIdentity> context) {
+            this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
+            this.supportedRpcs = mappingService.getRpcQNamesFor(service);
+            registrations = new HashSet<>();
+            for (QName rpc : supportedRpcs) {
+                registrations.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this));
+            }
+            registrations = ImmutableSet.copyOf(registrations);
+        }
+
+        public void registerPaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
+                Set<InstanceIdentifier<?>> set) {
+            QName ctx = BindingReflections.findQName(context);
+            for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
+                    toDOMInstanceIdentifier)) {
+                for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
+                    reg.registerPath(ctx, path);
+                }
+            }
+        }
+
+        public void removePaths(Class<? extends BaseIdentity> context, Class<? extends RpcService> service,
+                Set<InstanceIdentifier<?>> set) {
+            QName ctx = BindingReflections.findQName(context);
+            for (org.opendaylight.yangtools.yang.data.api.InstanceIdentifier path : FluentIterable.from(set).transform(
+                    toDOMInstanceIdentifier)) {
+                for (org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration reg : registrations) {
+                    reg.unregisterPath(ctx, path);
+                }
+            }
+        }
+
+        @Override
+        public Set<QName> getSupportedRpcs() {
+            return supportedRpcs;
+        }
+
+        @Override
+        public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode domInput) {
+            checkArgument(rpc != null);
+            checkArgument(domInput != null);
+
+            Class<? extends RpcService> rpcType = rpcServiceType.get();
+            checkState(rpcType != null);
+            RpcService rpcService = baRpcRegistry.getRpcService(rpcType);
+            checkState(rpcService != null);
+            CompositeNode domUnwrappedInput = domInput.getFirstCompositeByName(QName.create(rpc, "input"));
+            try {
+                return resolveInvocationStrategy(rpc, rpcType).invokeOn(rpcService, domUnwrappedInput);
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        private RpcInvocationStrategy resolveInvocationStrategy(final QName rpc,
+                final Class<? extends RpcService> rpcType) throws Exception {
+            return ClassLoaderUtils.withClassLoader(rpcType.getClassLoader(), new Callable<RpcInvocationStrategy>() {
+                @Override
+                public RpcInvocationStrategy call() throws Exception {
+                    String methodName = BindingMapping.getMethodName(rpc);
+                    Method targetMethod = null;
+                    for (Method possibleMethod : rpcType.getMethods()) {
+                        if (possibleMethod.getName().equals(methodName)
+                                && BindingReflections.isRpcMethod(possibleMethod)) {
+                            targetMethod = possibleMethod;
+                            break;
+                        }
+                    }
+                    checkState(targetMethod != null, "Rpc method not found");
+                    Optional<Class<?>> outputClass = BindingReflections.resolveRpcOutputClass(targetMethod);
+                    Optional<Class<? extends DataContainer>> inputClass = BindingReflections
+                            .resolveRpcInputClass(targetMethod);
+
+                    RpcInvocationStrategy strategy = null;
+                    if (outputClass.isPresent()) {
+                        if (inputClass.isPresent()) {
+                            strategy = new DefaultInvocationStrategy(targetMethod, outputClass.get(), inputClass.get());
+                        } else {
+                            strategy = new NoInputNoOutputInvocationStrategy(targetMethod);
+                        }
+                    } else {
+                        strategy = null;
+                    }
+                    return strategy;
+                }
+
+            });
+        }
+    }
+
+    private abstract class RpcInvocationStrategy {
+
+        protected final Method targetMethod;
+
+        public RpcInvocationStrategy(Method targetMethod) {
+            this.targetMethod = targetMethod;
+        }
+
+        public abstract RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput)
+                throws Exception;
+
+        public RpcResult<CompositeNode> invokeOn(RpcService rpcService, CompositeNode domInput) throws Exception {
+            return uncheckedInvoke(rpcService, domInput);
+        }
+    }
+
+    private class DefaultInvocationStrategy extends RpcInvocationStrategy {
+
+        @SuppressWarnings("rawtypes")
+        private WeakReference<Class> inputClass;
+
+        @SuppressWarnings("rawtypes")
+        private WeakReference<Class> outputClass;
+
+        public DefaultInvocationStrategy(Method targetMethod, Class<?> outputClass,
+                Class<? extends DataContainer> inputClass) {
+            super(targetMethod);
+            this.outputClass = new WeakReference(outputClass);
+            this.inputClass = new WeakReference(inputClass);
+        }
+
+        @Override
+        public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
+            DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
+            Future<RpcResult<?>> result = (Future<RpcResult<?>>) targetMethod.invoke(rpcService, bindingInput);
+            if (result == null) {
+                return Rpcs.getRpcResult(false);
+            }
+            RpcResult<?> bindingResult = result.get();
+            return Rpcs.getRpcResult(true);
+        }
+
+    }
+
+    private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy {
+
+        public NoInputNoOutputInvocationStrategy(Method targetMethod) {
+            super(targetMethod);
+        }
+
+        public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
+            Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
+            RpcResult<Void> bindingResult = result.get();
+            return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors());
+        }
+
+    }
 }
index b1983fe..e16ae48 100644 (file)
@@ -1,10 +1,13 @@
 package org.opendaylight.controller.sal.binding.impl.connect.dom;
 
 import java.util.Map.Entry;
+import java.util.Set;
 
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 
 public interface BindingIndependentMappingService {
@@ -19,5 +22,9 @@ public interface BindingIndependentMappingService {
     DataObject dataObjectFromDataDom(InstanceIdentifier<? extends DataObject> path, CompositeNode result) throws DeserializationException;
 
     InstanceIdentifier<?> fromDataDom(org.opendaylight.yangtools.yang.data.api.InstanceIdentifier entry)  throws DeserializationException;
+    
+    Set<QName> getRpcQNamesFor(Class<? extends RpcService> service);
+
+    DataContainer dataObjectFromDataDom(Class<? extends DataContainer> inputClass, CompositeNode domInput);
 
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/RoutingContext.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/RoutingContext.java
new file mode 100644 (file)
index 0000000..49e056b
--- /dev/null
@@ -0,0 +1,5 @@
+package org.opendaylight.controller.sal.binding.spi;
+
+public class RoutingContext {
+
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/RpcContextIdentifier.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/spi/RpcContextIdentifier.java
new file mode 100644 (file)
index 0000000..33569eb
--- /dev/null
@@ -0,0 +1,65 @@
+package org.opendaylight.controller.sal.binding.spi;
+
+import org.opendaylight.yangtools.concepts.Immutable;
+import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yangtools.yang.binding.RpcService;
+
+public final  class RpcContextIdentifier implements Immutable{
+
+    public final Class<? extends RpcService> rpcService;
+    public final Class<? extends BaseIdentity> routingContext;
+   
+    private RpcContextIdentifier(Class<? extends RpcService> rpcService, Class<? extends BaseIdentity> routingContext) {
+        super();
+        this.rpcService = rpcService;
+        this.routingContext = routingContext;
+    }
+
+    public Class<? extends RpcService> getRpcService() {
+        return rpcService;
+    }
+
+    public Class<? extends BaseIdentity> getRoutingContext() {
+        return routingContext;
+    }
+    
+    public static final RpcContextIdentifier contextForGlobalRpc(Class<? extends RpcService> serviceType) {
+        return new RpcContextIdentifier(serviceType, null);
+    }
+    
+    public static final RpcContextIdentifier contextFor(Class<? extends RpcService> serviceType,Class<? extends BaseIdentity> routingContext) {
+        return new RpcContextIdentifier(serviceType, routingContext);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((routingContext == null) ? 0 : routingContext.hashCode());
+        result = prime * result + ((rpcService == null) ? 0 : rpcService.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        RpcContextIdentifier other = (RpcContextIdentifier) obj;
+        if (routingContext == null) {
+            if (other.routingContext != null)
+                return false;
+        } else if (!routingContext.equals(other.routingContext))
+            return false;
+        if (rpcService == null) {
+            if (other.rpcService != null)
+                return false;
+        } else if (!rpcService.equals(other.rpcService))
+            return false;
+        return true;
+    }
+
+}
index 7db90b6..621d048 100644 (file)
@@ -9,6 +9,9 @@ package org.opendaylight.controller.sal.binding.spi;
 
 import java.util.Set;
 
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.yangtools.yang.binding.BaseIdentity;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcImplementation;
@@ -26,7 +29,8 @@ import org.opendaylight.yangtools.yang.binding.RpcService;
  *            Type of RpcService for which router provides routing information
  *            and route selection.
  */
-public interface RpcRouter<T extends RpcService> extends RpcImplementation{
+public interface RpcRouter<T extends RpcService> extends //
+        RouteChangePublisher<Class<? extends BaseIdentity>, InstanceIdentifier<?>> {
 
     /**
      * Returns a type of RpcService which is served by this instance of router.
@@ -72,12 +76,11 @@ public interface RpcRouter<T extends RpcService> extends RpcImplementation{
      * @return default instance responsible for processing RPCs.
      */
     T getDefaultService();
-    
-    /**
-     * 
-     */
-    void setDefaultService(T service);
 
     Set<Class<? extends BaseIdentity>> getContexts();
+    
+    RoutedRpcRegistration<T> addRoutedRpcImplementation(T service);
+    
+    RpcRegistration<T> registerDefaultService(T service);
 
 }
index a7a70c2..633506f 100644 (file)
@@ -12,7 +12,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentDataServiceConnector;
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector;
 import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentMappingService;
 import org.opendaylight.controller.sal.binding.test.util.BindingBrokerTestFactory;
 import org.opendaylight.controller.sal.binding.test.util.BindingTestContext;
index 1a97bd6..9c9841a 100644 (file)
@@ -65,9 +65,6 @@ public class RuntimeCodeGeneratorTest {
         assertNotNull(product);
         assertNotNull(product.getInvocationProxy());
 
-        assertNotNull(product.getSupportedInputs());
-        assertTrue(product.getSupportedInputs().contains(SimpleInput.class));
-        assertTrue(product.getSupportedInputs().contains(InheritedContextInput.class));
         assertEquals("2 fields should be generated.", 2, product.getInvocationProxy().getClass().getFields().length);
 
         verifyRouting(product);
index 4e611c5..3217a31 100644 (file)
@@ -7,15 +7,22 @@ import java.util.Set;
 
 import javassist.ClassPool;
 
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.dom.serializer.impl.RuntimeGeneratedMappingServiceImpl;
+import org.opendaylight.controller.sal.binding.impl.BindingAwareBrokerImpl;
 import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentDataServiceConnector;
+import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl;
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector;
 import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentMappingService;
 import org.opendaylight.controller.sal.binding.test.AbstractDataServiceTest;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.controller.sal.core.api.data.DataStore;
+import org.opendaylight.controller.sal.dom.broker.BrokerImpl;
 import org.opendaylight.controller.sal.dom.broker.impl.DataStoreStatsWrapper;
 import org.opendaylight.controller.sal.dom.broker.impl.HashMapDataStore;
+import org.opendaylight.controller.sal.dom.broker.impl.RpcRouterImpl;
 import org.opendaylight.controller.sal.dom.broker.impl.SchemaAwareDataStoreAdapter;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -31,7 +38,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 
 import static com.google.common.base.Preconditions.*;
 
-public class BindingTestContext {
+public class BindingTestContext implements AutoCloseable {
     
     
     public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier TREE_ROOT = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
@@ -40,10 +47,15 @@ public class BindingTestContext {
     private static final Logger LOG = LoggerFactory.getLogger(BindingTestContext.class);
     
     private RuntimeGeneratedMappingServiceImpl mappingServiceImpl;
+    
+    
+    private BindingAwareBrokerImpl baBrokerImpl;
     private DataBrokerImpl baDataImpl;
+    private NotificationBrokerImpl baNotifyImpl;
+    private BindingIndependentConnector baConnectDataServiceImpl;
+
     private org.opendaylight.controller.sal.dom.broker.DataBrokerImpl biDataImpl;
-    
-    private BindingIndependentDataServiceConnector connectorServiceImpl;
+    private BrokerImpl biBrokerImpl;
     private HashMapDataStore rawDataStore;
     private SchemaAwareDataStoreAdapter schemaAwareDataStore;
     private DataStoreStatsWrapper dataStoreStats;
@@ -56,6 +68,7 @@ public class BindingTestContext {
     private final ClassPool classPool;
 
     private final boolean startWithSchema;
+
     
     protected BindingTestContext(ListeningExecutorService executor, ClassPool classPool, boolean startWithSchema) {
         this.executor = executor;
@@ -93,15 +106,29 @@ public class BindingTestContext {
         baDataImpl.setExecutor(executor);
     }
     
+    public void startBindingBroker() {
+        checkState(executor != null,"Executor needs to be set");
+        checkState(baDataImpl != null,"Binding Data Broker must be started");
+        checkState(baNotifyImpl != null, "Notification Service must be started");
+        baBrokerImpl = new BindingAwareBrokerImpl(null);
+        
+        baBrokerImpl.setDataBroker(baDataImpl);
+        baBrokerImpl.setNotifyBroker(baNotifyImpl);
+        
+        baBrokerImpl.start();
+    }
+    
     public void startBindingToDomDataConnector() {
         checkState(baDataImpl != null,"Binding Data Broker needs to be started");
         checkState(biDataImpl != null,"DOM Data Broker needs to be started.");
         checkState(mappingServiceImpl != null,"DOM Mapping Service needs to be started.");
-        connectorServiceImpl = new BindingIndependentDataServiceConnector();
-        connectorServiceImpl.setBaDataService(baDataImpl);
-        connectorServiceImpl.setBiDataService(biDataImpl);
-        connectorServiceImpl.setMappingService(mappingServiceImpl);
-        connectorServiceImpl.start();
+        baConnectDataServiceImpl = new BindingIndependentConnector();
+        baConnectDataServiceImpl.setRpcRegistry(baBrokerImpl);
+        baConnectDataServiceImpl.setDomRpcRegistry(getDomRpcRegistry());
+        baConnectDataServiceImpl.setBaDataService(baDataImpl);
+        baConnectDataServiceImpl.setBiDataService(biDataImpl);
+        baConnectDataServiceImpl.setMappingService(mappingServiceImpl);
+        baConnectDataServiceImpl.start();
     }
     
     public void startBindingToDomMappingService() {
@@ -149,8 +176,11 @@ public class BindingTestContext {
     
     public void start() {
         startBindingDataBroker();
+        startBindingNotificationBroker();
+        startBindingBroker();
         startDomDataBroker();
         startDomDataStore();
+        startDomBroker();
         startBindingToDomMappingService();
         startBindingToDomDataConnector();
         if(startWithSchema) {
@@ -158,6 +188,19 @@ public class BindingTestContext {
         }
     }
 
+    private void startDomBroker() {
+        checkState(executor != null);
+        biBrokerImpl = new BrokerImpl();
+        biBrokerImpl.setExecutor(executor);
+        biBrokerImpl.setRouter(new RpcRouterImpl("test"));
+    }
+
+    public void startBindingNotificationBroker() {
+        checkState(executor != null);
+        baNotifyImpl = new NotificationBrokerImpl(executor);
+        
+    }
+
     public void loadYangSchemaFromClasspath() {
         String[] files = getAllYangFilesOnClasspath();
         updateYangSchema(files);
@@ -196,4 +239,24 @@ public class BindingTestContext {
                 dataStoreStats.getRequestCommitCount(), dataStoreStats.getRequestCommitTotalTime(),
                 dataStoreStats.getRequestCommitAverageTime());
     }
+
+    public RpcProviderRegistry getBindingRpcRegistry() {
+        return baBrokerImpl;
+    }
+
+    public RpcProvisionRegistry getDomRpcRegistry() {
+        if(biBrokerImpl == null) {
+            return null;
+        }
+        return biBrokerImpl.getRouter();
+    }
+    
+    public RpcImplementation getDomRpcInvoker() {
+        return biBrokerImpl.getRouter();
+    }
+    
+    @Override
+    public void close() throws Exception {
+        
+    }
 }
diff --git a/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerRpcTest.java b/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerRpcTest.java
new file mode 100644 (file)
index 0000000..92a0a3a
--- /dev/null
@@ -0,0 +1,159 @@
+package org.opendaylight.controller.sal.binding.test.connect.dom;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.binding.test.util.BindingBrokerTestFactory;
+import org.opendaylight.controller.sal.binding.test.util.BindingTestContext;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeFlowRemoved;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+
+import static junit.framework.Assert.*;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public class CrossBrokerRpcTest {
+
+    protected RpcProviderRegistry baRpcRegistry;
+    protected RpcProvisionRegistry biRpcRegistry;
+    private BindingTestContext testContext;
+    private RpcImplementation biRpcInvoker;
+    private MessageCapturingFlowService flowService;
+
+    public static final NodeId NODE_A = new NodeId("a");
+    public static final NodeId NODE_B = new NodeId("b");
+    public static final NodeId NODE_C = new NodeId("c");
+    public static final NodeId NODE_D = new NodeId("d");
+
+    public static final InstanceIdentifier<Node> BA_NODE_A_ID = createBANodeIdentifier(NODE_A);
+    public static final InstanceIdentifier<Node> BA_NODE_B_ID = createBANodeIdentifier(NODE_B);
+    public static final InstanceIdentifier<Node> BA_NODE_C_ID = createBANodeIdentifier(NODE_C);
+    public static final InstanceIdentifier<Node> BA_NODE_D_ID = createBANodeIdentifier(NODE_D);
+
+    public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_A_ID = createBINodeIdentifier(NODE_A);
+    public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_B_ID = createBINodeIdentifier(NODE_B);
+    public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_C_ID = createBINodeIdentifier(NODE_C);
+    public static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier BI_NODE_D_ID = createBINodeIdentifier(NODE_D);
+
+    private static final QName NODE_ID_QNAME = QName.create(Node.QNAME, "id");
+    private static final QName ADD_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "add-flow");
+    private static final QName REMOVE_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "remove-flow");
+    private static final QName UPDATE_FLOW_QNAME = QName.create(NodeFlowRemoved.QNAME, "update-flow");
+
+    @Before
+    public void setup() {
+        BindingBrokerTestFactory testFactory = new BindingBrokerTestFactory();
+        testFactory.setExecutor(MoreExecutors.sameThreadExecutor());
+        testFactory.setStartWithParsedSchema(true);
+        testContext = testFactory.getTestContext();
+
+        testContext.start();
+        baRpcRegistry = testContext.getBindingRpcRegistry();
+        biRpcRegistry = testContext.getDomRpcRegistry();
+        biRpcInvoker = testContext.getDomRpcInvoker();
+        assertNotNull(baRpcRegistry);
+        assertNotNull(biRpcRegistry);
+
+        flowService = MessageCapturingFlowService.create(baRpcRegistry);
+
+    }
+
+    @Test
+    public void bindingRoutedRpcProvider_DomInvokerTest() {
+
+        flowService//
+                .registerPath(NodeContext.class, BA_NODE_A_ID) //
+                .registerPath(NodeContext.class, BA_NODE_B_ID) //
+                .setAddFlowResult(addFlowResult(true, 10));
+
+        SalFlowService baFlowInvoker = baRpcRegistry.getRpcService(SalFlowService.class);
+        assertNotSame(flowService, baFlowInvoker);
+
+        AddFlowInput addFlowA = addFlow(BA_NODE_A_ID) //
+                .setPriority(100).setBarrier(true).build();
+
+        CompositeNode addFlowDom = toDomRpc(ADD_FLOW_QNAME, addFlowA);
+        assertNotNull(addFlowDom);
+        RpcResult<CompositeNode> domResult = biRpcInvoker.invokeRpc(ADD_FLOW_QNAME, addFlowDom);
+        assertNotNull(domResult);
+        assertTrue("DOM result is successful.", domResult.isSuccessful());
+        assertTrue("Bidning Add Flow RPC was captured.", flowService.getReceivedAddFlows().containsKey(BA_NODE_A_ID));
+        assertEquals(addFlowA, flowService.getReceivedAddFlows().get(BA_NODE_A_ID).iterator().next());
+    }
+
+    public void bindingRpcInvoker_DomRoutedProviderTest() {
+
+    }
+
+    private CompositeNode toDomRpcInput(DataObject addFlowA) {
+        return testContext.getBindingToDomMappingService().toDataDom(addFlowA);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        testContext.close();
+    }
+
+    private static InstanceIdentifier<Node> createBANodeIdentifier(NodeId node) {
+        return InstanceIdentifier.builder(Nodes.class).child(Node.class, new NodeKey(node)).toInstance();
+    }
+
+    private static org.opendaylight.yangtools.yang.data.api.InstanceIdentifier createBINodeIdentifier(NodeId node) {
+        return org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder().node(Nodes.QNAME)
+                .nodeWithKey(Node.QNAME, NODE_ID_QNAME, node.getValue()).toInstance();
+    }
+
+    private Future<RpcResult<AddFlowOutput>> addFlowResult(boolean success, long xid) {
+        AddFlowOutput output = new AddFlowOutputBuilder() //
+                .setTransactionId(new TransactionId(BigInteger.valueOf(xid))).build();
+        RpcResult<AddFlowOutput> result = Rpcs.getRpcResult(success, output, ImmutableList.<RpcError> of());
+        return Futures.immediateFuture(result);
+    }
+
+    private static AddFlowInputBuilder addFlow(InstanceIdentifier<Node> nodeId) {
+        AddFlowInputBuilder builder = new AddFlowInputBuilder();
+        builder.setNode(new NodeRef(nodeId));
+        return builder;
+    }
+
+    private CompositeNode toDomRpc(QName rpcName, AddFlowInput addFlowA) {
+        return new CompositeNodeTOImpl(rpcName, null,
+                Collections.<org.opendaylight.yangtools.yang.data.api.Node<?>> singletonList(toDomRpcInput(addFlowA)));
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/MessageCapturingFlowService.java b/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/MessageCapturingFlowService.java
new file mode 100644 (file)
index 0000000..74f0781
--- /dev/null
@@ -0,0 +1,122 @@
+package org.opendaylight.controller.sal.binding.test.connect.dom;
+
+import static junit.framework.Assert.assertNotNull;
+
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yangtools.yang.binding.BaseIdentity;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+public class MessageCapturingFlowService implements SalFlowService, AutoCloseable {
+
+    private Future<RpcResult<AddFlowOutput>> addFlowResult;
+    private Future<RpcResult<RemoveFlowOutput>> removeFlowResult;
+    private Future<RpcResult<UpdateFlowOutput>> updateFlowResult;
+
+    private final Multimap<InstanceIdentifier<?>, AddFlowInput> receivedAddFlows = HashMultimap.create();
+    private final Multimap<InstanceIdentifier<?>, RemoveFlowInput> receivedRemoveFlows = HashMultimap.create();
+    private final Multimap<InstanceIdentifier<?>, UpdateFlowInput> receivedUpdateFlows = HashMultimap.create();
+    private RoutedRpcRegistration<SalFlowService> registration;
+
+    @Override
+    public Future<RpcResult<AddFlowOutput>> addFlow(AddFlowInput arg0) {
+        receivedAddFlows.put(arg0.getNode().getValue(), arg0);
+        return addFlowResult;
+    }
+
+    @Override
+    public Future<RpcResult<RemoveFlowOutput>> removeFlow(RemoveFlowInput arg0) {
+        receivedRemoveFlows.put(arg0.getNode().getValue(), arg0);
+        return removeFlowResult;
+    }
+
+    @Override
+    public Future<RpcResult<UpdateFlowOutput>> updateFlow(UpdateFlowInput arg0) {
+        receivedUpdateFlows.put(arg0.getNode().getValue(), arg0);
+        return updateFlowResult;
+    }
+
+    public Future<RpcResult<AddFlowOutput>> getAddFlowResult() {
+        return addFlowResult;
+    }
+
+    public MessageCapturingFlowService setAddFlowResult(Future<RpcResult<AddFlowOutput>> addFlowResult) {
+        this.addFlowResult = addFlowResult;
+        return this;
+    }
+
+    public Future<RpcResult<RemoveFlowOutput>> getRemoveFlowResult() {
+        return removeFlowResult;
+    }
+
+    public MessageCapturingFlowService setRemoveFlowResult(Future<RpcResult<RemoveFlowOutput>> removeFlowResult) {
+        this.removeFlowResult = removeFlowResult;
+        return this;
+    }
+
+    public Future<RpcResult<UpdateFlowOutput>> getUpdateFlowResult() {
+        return updateFlowResult;
+    }
+
+    public MessageCapturingFlowService setUpdateFlowResult(Future<RpcResult<UpdateFlowOutput>> updateFlowResult) {
+        this.updateFlowResult = updateFlowResult;
+        return this;
+    }
+
+    public Multimap<InstanceIdentifier<?>, AddFlowInput> getReceivedAddFlows() {
+        return receivedAddFlows;
+    }
+
+    public Multimap<InstanceIdentifier<?>, RemoveFlowInput> getReceivedRemoveFlows() {
+        return receivedRemoveFlows;
+    }
+
+    public Multimap<InstanceIdentifier<?>, UpdateFlowInput> getReceivedUpdateFlows() {
+        return receivedUpdateFlows;
+    }
+
+    public MessageCapturingFlowService registerTo(RpcProviderRegistry registry) {
+        registration = registry.addRoutedRpcImplementation(SalFlowService.class, this);
+        assertNotNull(registration);
+        return this;
+    }
+
+    public void close() throws Exception {
+        registration.close();
+    }
+
+    public MessageCapturingFlowService registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<?> path) {
+        registration.registerPath(context, path);
+        return this;
+    }
+
+    public MessageCapturingFlowService unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<?> path) {
+        registration.unregisterPath(context, path);
+        return this;
+    }
+    
+    public static MessageCapturingFlowService create() {
+        return new MessageCapturingFlowService();
+    }
+    
+    public static MessageCapturingFlowService create(RpcProviderRegistry registry) {
+        MessageCapturingFlowService ret = new MessageCapturingFlowService();
+        ret.registerTo(registry);
+        return ret;
+    }
+    
+    
+}
\ No newline at end of file
index 89851c9..bee29a1 100644 (file)
@@ -4,5 +4,5 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
 public interface RouteChangePublisher<C,P> {
 
-    ListenerRegistration<RouteChangeListener<C,P>> registerRouteChangeListener(RouteChangeListener<C,P> listener);
+    <L extends RouteChangeListener<C,P>> ListenerRegistration<L> registerRouteChangeListener(L listener);
 }
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/RoutingUtils.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/routing/RoutingUtils.java
new file mode 100644 (file)
index 0000000..60d0cdf
--- /dev/null
@@ -0,0 +1,92 @@
+package org.opendaylight.controller.md.sal.common.impl.routing;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class RoutingUtils {
+    
+    public static <C,P> RouteChange<C,P> removalChange(C context,P path) {
+        final ImmutableMap<C, Set<P>> announcements = ImmutableMap.<C,Set<P>>of();
+        final ImmutableMap<C, Set<P>> removals = ImmutableMap.<C,Set<P>>of(context, ImmutableSet.of(path));
+        return new RouteChangeImpl<C,P>(announcements, removals);
+    }
+    
+    public static <C,P> RouteChange<C,P> announcementChange(C context,P path) {
+        final ImmutableMap<C, Set<P>> announcements = ImmutableMap.<C,Set<P>>of(context, ImmutableSet.of(path));
+        final ImmutableMap<C, Set<P>> removals = ImmutableMap.<C,Set<P>>of();
+        return new RouteChangeImpl<C,P>(announcements, removals);
+    }
+    
+    
+    public static <C,P> RouteChange<C,P> change(Map<C, Set<P>> announcements,
+            Map<C, Set<P>> removals) {
+        final ImmutableMap<C, Set<P>> immutableAnnouncements = ImmutableMap.<C,Set<P>>copyOf(announcements);
+        final ImmutableMap<C, Set<P>> immutableRemovals = ImmutableMap.<C,Set<P>>copyOf(removals);
+        return new RouteChangeImpl<C,P>(immutableAnnouncements, immutableRemovals);
+    }
+    
+    
+    private static class RouteChangeImpl<C,P> implements RouteChange<C, P> {
+        private final Map<C, Set<P>> removal;
+        private final Map<C, Set<P>> announcement;
+
+        public RouteChangeImpl(ImmutableMap<C, Set<P>> removal, ImmutableMap<C, Set<P>> announcement) {
+            super();
+            this.removal = removal;
+            this.announcement = announcement;
+        }
+
+        @Override
+        public Map<C, Set<P>> getAnnouncements() {
+            return announcement;
+        }
+        
+        @Override
+        public Map<C, Set<P>> getRemovals() {
+            return removal;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((announcement == null) ? 0 : announcement.hashCode());
+            result = prime * result + ((removal == null) ? 0 : removal.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            RouteChangeImpl other = (RouteChangeImpl) obj;
+            if (announcement == null) {
+                if (other.announcement != null)
+                    return false;
+            } else if (!announcement.equals(other.announcement))
+                return false;
+            if (removal == null) {
+                if (other.removal != null) {
+                    return false;
+                }
+            } else if (!removal.equals(other.removal))
+                return false;
+            return true;
+        }
+    }
+
+
+    
+}
index 7e8069a..ff15e72 100644 (file)
       <artifactId>concepts</artifactId>
       <version>0.1.1-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 
   <packaging>bundle</packaging>
index 54e1a06..356ec8f 100644 (file)
@@ -11,17 +11,31 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+
+import org.opendaylight.yangtools.concepts.Immutable;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
+import com.google.common.collect.ImmutableList;
+
 public class Rpcs {
+    
+    public static <T> RpcResult<T> getRpcResult(boolean successful) {
+        RpcResult<T> ret = new RpcResultTO<T>(successful, null, ImmutableList.<RpcError>of());
+        return ret;
+    }
+    
     public static <T> RpcResult<T> getRpcResult(boolean successful, T result,
             Collection<RpcError> errors) {
         RpcResult<T> ret = new RpcResultTO<T>(successful, result, errors);
         return ret;
     }
 
-    private static class RpcResultTO<T> implements RpcResult<T>, Serializable {
+    public static <T> RpcResult<T> getRpcResult(boolean successful, Collection<RpcError> errors) {
+        return new RpcResultTO<T>(successful, null, errors);
+    }
+    
+    private static class RpcResultTO<T> implements RpcResult<T>, Serializable, Immutable {
 
         private final Collection<RpcError> errors;
         private final T result;
@@ -31,8 +45,7 @@ public class Rpcs {
                 Collection<RpcError> errors) {
             this.successful = successful;
             this.result = result;
-            this.errors = Collections.unmodifiableList(new ArrayList<RpcError>(
-                    errors));
+            this.errors = ImmutableList.copyOf(errors);
         }
 
         @Override
index 7ef594b..8f17998 100644 (file)
@@ -29,8 +29,10 @@ import org.slf4j.LoggerFactory
 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
 import org.opendaylight.yangtools.concepts.ListenerRegistration
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
+import org.opendaylight.controller.sal.core.api.RpcImplementation
 
-public class BrokerImpl implements Broker, AutoCloseable {
+public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
     private static val log = LoggerFactory.getLogger(BrokerImpl);
 
     // Broker Generic Context
@@ -115,4 +117,12 @@ public class BrokerImpl implements Broker, AutoCloseable {
         deactivator?.close();
     }
     
+    override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
+        router.addRpcImplementation(rpcType,implementation);
+    }
+    
+    override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
+        router.addRoutedRpcImplementation(rpcType,implementation);
+    }
+    
 }
index d8680ce..5ee19a0 100644 (file)
@@ -15,6 +15,8 @@ import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
 import org.slf4j.LoggerFactory
 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
+import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
 
 class RpcRouterImpl implements RpcRouter, Identifiable<String> {
 
@@ -35,6 +37,20 @@ class RpcRouterImpl implements RpcRouter, Identifiable<String> {
     }
 
     override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
+                checkNotNull(rpcType, "Rpc Type should not be null");
+        checkNotNull(implementation, "Implementation should not be null.");
+        val reg = new RoutedRpcRegistrationImpl(rpcType, implementation, this);
+        implementations.put(rpcType, reg)
+
+        for (listener : rpcRegistrationListeners.listeners) {
+            try {
+                listener.instance.onRpcImplementationAdded(rpcType);
+            } catch (Exception e) {
+                log.error("Unhandled exception during invoking listener", e);
+            }
+        }
+
+        return reg;
     }
 
     override addRpcImplementation(QName rpcType, RpcImplementation implementation) throws IllegalArgumentException {
@@ -102,5 +118,23 @@ class RpcRegistrationImpl extends AbstractObjectRegistration<RpcImplementation>
     override protected removeRegistration() {
         router.remove(this);
     }
+}
+class RoutedRpcRegistrationImpl extends RpcRegistrationImpl implements RoutedRpcRegistration {
+
+
+    new(QName type, RpcImplementation instance, RpcRouterImpl router) {
+        super(type,instance,router)
+    }
 
+    override protected removeRegistration() {
+        router.remove(this);
+    }
+    override registerPath(QName context, InstanceIdentifier path) {
+        //
+        
+    }
+
+    override unregisterPath(QName context, InstanceIdentifier path) {
+        //
+    }
 }
index 4f4fadc..75e9649 100644 (file)
@@ -151,19 +151,20 @@ public class SchemaAwareDataStoreAdapter extends AbstractLockableDelegator<DataS
         this.schema = null;
     }
 
-    protected CompositeNode mergeData(InstanceIdentifier path, CompositeNode stored, CompositeNode modified, boolean config) {
+    protected CompositeNode mergeData(InstanceIdentifier path, CompositeNode stored, CompositeNode modified,
+            boolean config) {
         long startTime = System.nanoTime();
         try {
-        DataSchemaNode node = schemaNodeFor(path);
-        return YangDataOperations.merge(node,stored,modified,config);
+            DataSchemaNode node = schemaNodeFor(path);
+            return YangDataOperations.merge(node, stored, modified, config);
         } finally {
-            //System.out.println("Merge time: " + ((System.nanoTime() - startTime) / 1000.0d));
+            // System.out.println("Merge time: " + ((System.nanoTime() -
+            // startTime) / 1000.0d));
         }
     }
-    
-    
+
     private DataSchemaNode schemaNodeFor(InstanceIdentifier path) {
-        checkState(schema != null,"YANG Schema is not available");
+        checkState(schema != null, "YANG Schema is not available");
         return YangSchemaUtils.getSchemaNode(schema, path);
     }
 
@@ -171,16 +172,16 @@ public class SchemaAwareDataStoreAdapter extends AbstractLockableDelegator<DataS
             DataModification<InstanceIdentifier, CompositeNode> original) {
         // NOOP for now
         NormalizedDataModification normalized = new NormalizedDataModification(original);
-        for (Entry<InstanceIdentifier,CompositeNode> entry : original.getUpdatedConfigurationData().entrySet()) {
+        for (Entry<InstanceIdentifier, CompositeNode> entry : original.getUpdatedConfigurationData().entrySet()) {
             normalized.putConfigurationData(entry.getKey(), entry.getValue());
         }
-        for (Entry<InstanceIdentifier,CompositeNode> entry : original.getUpdatedOperationalData().entrySet()) {
+        for (Entry<InstanceIdentifier, CompositeNode> entry : original.getUpdatedOperationalData().entrySet()) {
             normalized.putOperationalData(entry.getKey(), entry.getValue());
         }
         for (InstanceIdentifier entry : original.getRemovedConfigurationData()) {
             normalized.removeConfigurationData(entry);
         }
-        for(InstanceIdentifier entry : original.getRemovedOperationalData()) {
+        for (InstanceIdentifier entry : original.getRemovedOperationalData()) {
             normalized.removeOperationalData(entry);
         }
         return normalized;
@@ -284,7 +285,7 @@ public class SchemaAwareDataStoreAdapter extends AbstractLockableDelegator<DataS
             }
         }
     }
-    
+
     private class NormalizedDataModification extends AbstractDataModification<InstanceIdentifier, CompositeNode> {
 
         private Object identifier;
@@ -295,12 +296,12 @@ public class SchemaAwareDataStoreAdapter extends AbstractLockableDelegator<DataS
             identifier = original;
             status = TransactionStatus.NEW;
         }
-        
+
         @Override
         public Object getIdentifier() {
             return this.identifier;
         }
-        
+
         @Override
         public TransactionStatus getStatus() {
             return status;
@@ -310,18 +311,19 @@ public class SchemaAwareDataStoreAdapter extends AbstractLockableDelegator<DataS
         public Future<RpcResult<TransactionStatus>> commit() {
             throw new UnsupportedOperationException("Commit should not be invoked on this");
         }
-        
+
         @Override
-        protected CompositeNode mergeConfigurationData(InstanceIdentifier path,CompositeNode stored, CompositeNode modified) {
-            return mergeData(path,stored, modified,true);
+        protected CompositeNode mergeConfigurationData(InstanceIdentifier path, CompositeNode stored,
+                CompositeNode modified) {
+            return mergeData(path, stored, modified, true);
         }
-        
+
         @Override
-        protected CompositeNode mergeOperationalData(InstanceIdentifier path,CompositeNode stored, CompositeNode modified) {
+        protected CompositeNode mergeOperationalData(InstanceIdentifier path, CompositeNode stored,
+                CompositeNode modified) {
             // TODO Auto-generated method stub
-            return mergeData(path,stored,modified,false);
+            return mergeData(path, stored, modified, false);
         }
-        
     }
 
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.