X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FBindingAwareBrokerImpl.xtend;h=1762aac090a0eac4a29cdac122d986298d773ec1;hp=31d5d0126fc8115546d1d1b7709671e946b2e026;hb=f062dc05cc7caaf0c1811856370f1c9e2f1e5c34;hpb=582da55f82ee5d83af2e7a327044c62ef3a76285 diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend index 31d5d0126f..1762aac090 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend @@ -44,17 +44,18 @@ import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils import java.util.concurrent.Executors import java.util.Collections import org.opendaylight.yangtools.yang.binding.DataObject -import org.opendaylight.controller.sal.binding.impl.connect.dom.ConnectorActivator +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.Callable +import java.util.WeakHashMap +import javax.annotation.concurrent.GuardedBy class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl) - private InstanceIdentifier root = InstanceIdentifier.builder().toInstance(); - private val clsPool = ClassPool.getDefault() - private var RuntimeCodeGenerator generator; - + private static val clsPool = ClassPool.getDefault() + public static var RuntimeCodeGenerator generator; /** * Map of all Managed Direct Proxies @@ -68,28 +69,29 @@ class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { * * */ - private val Map, RpcRouter> rpcRouters = new ConcurrentHashMap(); + private val Map, RpcRouter> rpcRouters = new WeakHashMap(); @Property - private var NotificationBrokerImpl notifyBroker - + private var NotificationProviderService notifyBroker + @Property - private var DataBrokerImpl dataBroker - + private var DataProviderService dataBroker + @Property var BundleContext brokerBundleContext - + ServiceRegistration notifyProviderRegistration - + ServiceRegistration notifyConsumerRegistration - + ServiceRegistration dataProviderRegistration - + ServiceRegistration dataConsumerRegistration - - ConnectorActivator connectorActivator - - + + private val proxyGenerationLock = new ReentrantLock; + + private val routerGenerationLock = new ReentrantLock; + public new(BundleContext bundleContext) { _brokerBundleContext = bundleContext; } @@ -99,27 +101,13 @@ class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { initGenerator(); val executor = Executors.newCachedThreadPool; + // Initialization of notificationBroker log.info("Starting MD-SAL: Binding Aware Notification Broker"); - notifyBroker = new NotificationBrokerImpl(executor); - notifyBroker.invokerFactory = generator.invokerFactory; log.info("Starting MD-SAL: Binding Aware Data Broker"); - dataBroker = new DataBrokerImpl(); - dataBroker.executor = executor; - val brokerProperties = newProperties(); - - log.info("Starting MD-SAL: Binding Aware Data Broker"); - notifyProviderRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker, - brokerProperties) - notifyConsumerRegistration = brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties) - dataProviderRegistration = brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties) - dataConsumerRegistration = brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties) - - connectorActivator = new ConnectorActivator(dataBroker,brokerBundleContext); - connectorActivator.start(); log.info("MD-SAL: Binding Aware Broker Started"); } @@ -163,20 +151,37 @@ class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { * If proxy class does not exist for supplied service class it will be generated automatically. */ private def getManagedDirectProxy(Class service) { - var RpcProxyContext existing = null + if ((existing = managedProxies.get(service)) != null) { return existing.proxy } - val proxyInstance = generator.getDirectProxyFor(service) - val rpcProxyCtx = new RpcProxyContext(proxyInstance.class) - val properties = new Hashtable() - rpcProxyCtx.proxy = proxyInstance as RpcService - - properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY - rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties) - managedProxies.put(service, rpcProxyCtx) - return rpcProxyCtx.proxy + return withLock(proxyGenerationLock) [ | + val maybeProxy = managedProxies.get(service); + if (maybeProxy !== null) { + return maybeProxy.proxy; + } + + + val proxyInstance = generator.getDirectProxyFor(service) + val rpcProxyCtx = new RpcProxyContext(proxyInstance.class) + val properties = new Hashtable() + rpcProxyCtx.proxy = proxyInstance as RpcService + properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY + rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties) + managedProxies.put(service, rpcProxyCtx) + return rpcProxyCtx.proxy + ] + } + + private static def T withLock(ReentrantLock lock, Callable method) { + try { + lock.lock(); + val ret = method.call; + return ret; + } finally { + lock.unlock(); + } } /** @@ -190,7 +195,7 @@ class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { val osgiReg = context.bundleContext.registerService(type, service, properties); proxy.delegate = service; - return new RpcServiceRegistrationImpl(type, service, osgiReg,this); + return new RpcServiceRegistrationImpl(type, service, osgiReg, this); } def RoutedRpcRegistration registerRoutedRpcImplementation(Class type, T service, @@ -208,14 +213,20 @@ class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { } // We created Router - val newRouter = generator.getRouterFor(type); - checkState(newRouter !== null); - rpcRouters.put(type, newRouter); - - // We create / update Direct Proxy for router - val proxy = getManagedDirectProxy(type); - proxy.delegate = newRouter.invocationProxy - return newRouter; + return withLock(routerGenerationLock) [ | + val maybeRouter = rpcRouters.get(type); + if (maybeRouter !== null) { + return maybeRouter as RpcRouter; + } + + val newRouter = generator.getRouterFor(type); + checkState(newRouter !== null); + rpcRouters.put(type, newRouter); + // We create / update Direct Proxy for router + val proxy = getManagedDirectProxy(type); + proxy.delegate = newRouter.invocationProxy + return newRouter; + ] } @@ -230,8 +241,8 @@ class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { // Updating internal structure of registration routingTable.updateRoute(path, registration.instance) + // Update routing table / send announce to message bus - val success = paths.put(context, path); } @@ -263,32 +274,32 @@ class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { routingTable.removeRoute(path) } } - + protected def void unregisterRpcService(RpcServiceRegistrationImpl registration) { val type = registration.serviceType; - + val proxy = managedProxies.get(type); - if(proxy.proxy.delegate === registration.instance) { + if (proxy.proxy.delegate === registration.instance) { proxy.proxy.delegate = null; } } - + def createDelegate(Class type) { getManagedDirectProxy(type); } - + def getRpcRouters() { return Collections.unmodifiableMap(rpcRouters); } - + override close() { dataConsumerRegistration.unregister() dataProviderRegistration.unregister() notifyConsumerRegistration.unregister() notifyProviderRegistration.unregister() } - + } class RoutedRpcRegistrationImpl extends AbstractObjectRegistration implements RoutedRpcRegistration { @@ -332,7 +343,7 @@ class RoutedRpcRegistrationImpl extends AbstractObjectRegi checkClosed() broker.unregisterPath(this, context, path); } - + override getServiceType() { return router.serviceType; } @@ -343,24 +354,25 @@ class RoutedRpcRegistrationImpl extends AbstractObjectRegi } } -class RpcServiceRegistrationImpl extends AbstractObjectRegistration implements RpcRegistration { + +class RpcServiceRegistrationImpl extends AbstractObjectRegistration implements RpcRegistration { val ServiceRegistration osgiRegistration; private var BindingAwareBrokerImpl broker; - + @Property val Class serviceType; - public new(Class type, T service, ServiceRegistration osgiReg,BindingAwareBrokerImpl broker) { + public new(Class type, T service, ServiceRegistration osgiReg, BindingAwareBrokerImpl broker) { super(service); this._serviceType = type; this.osgiRegistration = osgiReg; - this.broker= broker; + this.broker = broker; } override protected removeRegistration() { broker.unregisterRpcService(this); broker = null; } - + }