*/
package org.opendaylight.controller.sal.dom.broker;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.BrokerService;
-import org.opendaylight.controller.sal.core.api.Consumer;
-import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.spi.BrokerModule;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.osgi.framework.BundleContext;
-import org.slf4j.LoggerFactory;
+import java.util.Collections
+import java.util.HashMap
+import java.util.HashSet
+import java.util.Map
+import java.util.Set
+import java.util.concurrent.Callable
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.Future
+import org.opendaylight.controller.sal.core.api.Broker
+import org.opendaylight.controller.sal.core.api.BrokerService
+import org.opendaylight.controller.sal.core.api.Consumer
+import org.opendaylight.controller.sal.core.api.Provider
+import org.opendaylight.controller.sal.core.spi.BrokerModule
+import org.opendaylight.yangtools.yang.common.QName
+import org.opendaylight.yangtools.yang.common.RpcResult
+import org.opendaylight.yangtools.yang.data.api.CompositeNode
+import org.osgi.framework.BundleContext
+import org.slf4j.LoggerFactory
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
+import org.opendaylight.yangtools.concepts.ListenerRegistration
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
public class BrokerImpl implements Broker {
private static val log = LoggerFactory.getLogger(BrokerImpl);
private val Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections.
synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
- // RPC Context
- private val Map<QName, RpcImplementation> rpcImpls = Collections.synchronizedMap(
- new HashMap<QName, RpcImplementation>());
-
// Implementation specific
@Property
- private var ExecutorService executor;
+ private var ExecutorService executor = Executors.newFixedThreadPool(5);
@Property
private var BundleContext bundleContext;
+
+ @Property
+ private var RpcRouter router;
override registerConsumer(Consumer consumer, BundleContext ctx) {
checkPredicates(consumer);
return prov.getServiceForSession(service, session);
}
- // RPC Functionality
- protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) {
- if(rpcImpls.get(rpcType) != null) {
- throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered.");
- }
-
- //TODO: Add notification for availability of Rpc Implementation
- rpcImpls.put(rpcType, implementation);
- }
-
- protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) {
- if(implToRemove == rpcImpls.get(rpcType)) {
- rpcImpls.remove(rpcType);
- }
- }
-
protected def Future<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
- val impl = rpcImpls.get(rpc);
- val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
+ val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable<RpcResult<CompositeNode>>);
return result;
}