X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fdom%2Fbroker%2FBrokerImpl.xtend;h=855ad9bd328d19c79b30e09ba4e485e0d1918044;hb=c19766901dce1994ef2432f356b32d539b6c43cc;hp=83dda5902dee4e6fd819debabcaecd12ca0413e8;hpb=aec966831cce36d6719e54c72ac564a3d1cf3d69;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend index 83dda5902d..855ad9bd32 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend @@ -7,29 +7,28 @@ */ package org.opendaylight.controller.sal.dom.broker; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.controller.sal.core.api.BrokerService; -import org.opendaylight.controller.sal.core.api.Consumer; -import org.opendaylight.controller.sal.core.api.Provider; -import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.controller.sal.core.spi.BrokerModule; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.osgi.framework.BundleContext; -import org.slf4j.LoggerFactory; +import java.util.Collections +import java.util.HashMap +import java.util.HashSet +import java.util.Map +import java.util.Set +import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.Future +import org.opendaylight.controller.sal.core.api.Broker +import org.opendaylight.controller.sal.core.api.BrokerService +import org.opendaylight.controller.sal.core.api.Consumer +import org.opendaylight.controller.sal.core.api.Provider +import org.opendaylight.controller.sal.core.spi.BrokerModule +import org.opendaylight.yangtools.yang.common.QName +import org.opendaylight.yangtools.yang.common.RpcResult +import org.opendaylight.yangtools.yang.data.api.CompositeNode +import org.osgi.framework.BundleContext +import org.slf4j.LoggerFactory +import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter import org.opendaylight.yangtools.concepts.ListenerRegistration import org.opendaylight.controller.sal.core.api.RpcRegistrationListener -import org.opendaylight.controller.md.sal.common.impl.ListenerRegistry public class BrokerImpl implements Broker { private static val log = LoggerFactory.getLogger(BrokerImpl); @@ -42,17 +41,14 @@ public class BrokerImpl implements Broker { private val Map, BrokerModule> serviceProviders = Collections. synchronizedMap(new HashMap, BrokerModule>()); - - private val rpcRegistrationListeners = new ListenerRegistry(); - // RPC Context - private val Map rpcImpls = Collections.synchronizedMap( - new HashMap()); - // Implementation specific @Property private var ExecutorService executor = Executors.newFixedThreadPool(5); @Property private var BundleContext bundleContext; + + @Property + private var RpcRouter router; override registerConsumer(Consumer consumer, BundleContext ctx) { checkPredicates(consumer); @@ -95,42 +91,8 @@ public class BrokerImpl implements Broker { return prov.getServiceForSession(service, session); } - // RPC Functionality - protected def void addRpcImplementation(QName rpcType, RpcImplementation implementation) { - if(rpcImpls.get(rpcType) != null) { - throw new IllegalStateException("Implementation for rpc " + rpcType + " is already registered."); - } - - - rpcImpls.put(rpcType, implementation); - - - for(listener : rpcRegistrationListeners.listeners) { - try { - listener.instance.onRpcImplementationAdded(rpcType); - } catch (Exception e){ - log.error("Unhandled exception during invoking listener",e); - } - } - } - - protected def void removeRpcImplementation(QName rpcType, RpcImplementation implToRemove) { - if(implToRemove == rpcImpls.get(rpcType)) { - rpcImpls.remove(rpcType); - } - - for(listener : rpcRegistrationListeners.listeners) { - try { - listener.instance.onRpcImplementationRemoved(rpcType); - } catch (Exception e){ - log.error("Unhandled exception during invoking listener",e); - } - } - } - protected def Future> invokeRpc(QName rpc, CompositeNode input) { - val impl = rpcImpls.get(rpc); - val result = executor.submit([|impl.invokeRpc(rpc, input)] as Callable>); + val result = executor.submit([|router.invokeRpc(rpc, input)] as Callable>); return result; } @@ -171,12 +133,4 @@ public class BrokerImpl implements Broker { sessions.remove(consumerContextImpl); providerSessions.remove(consumerContextImpl); } - - protected def getSupportedRpcs() { - rpcImpls.keySet; - } - - def ListenerRegistration addRpcRegistrationListener(RpcRegistrationListener listener) { - rpcRegistrationListeners.register(listener); - } }