X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FBindingAwareBrokerImpl.xtend;h=1762aac090a0eac4a29cdac122d986298d773ec1;hb=a87db38d47967eae159c5be17ab334bb6a4edffc;hp=740ae887b0588bcc7648aaba49bb7f63eef5b53a;hpb=8b6075992f1e18eb678ee4e50e13b3d2d1397a85;p=controller.git diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend index 740ae887b0..1762aac090 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend @@ -40,14 +40,22 @@ import org.opendaylight.yangtools.concepts.AbstractObjectRegistration import org.opendaylight.yangtools.yang.binding.BaseIdentity import com.google.common.collect.Multimap import com.google.common.collect.HashMultimap -import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.* +import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.* import java.util.concurrent.Executors - -class BindingAwareBrokerImpl implements BindingAwareBroker { +import java.util.Collections +import org.opendaylight.yangtools.yang.binding.DataObject +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.Callable +import java.util.WeakHashMap +import javax.annotation.concurrent.GuardedBy + +class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl) - private val clsPool = ClassPool.getDefault() - private var RuntimeCodeGenerator generator; + private InstanceIdentifier root = InstanceIdentifier.builder().toInstance(); + + private static val clsPool = ClassPool.getDefault() + public static var RuntimeCodeGenerator generator; /** * Map of all Managed Direct Proxies @@ -61,30 +69,46 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { * * */ - private val Map, RpcRouter> rpcRouters = new ConcurrentHashMap(); + private val Map, RpcRouter> rpcRouters = new WeakHashMap(); - private var NotificationBrokerImpl notifyBroker - private var DataBrokerImpl dataBroker - private var ServiceRegistration notifyBrokerRegistration + @Property + private var NotificationProviderService notifyBroker + + @Property + private var DataProviderService dataBroker @Property var BundleContext brokerBundleContext + ServiceRegistration notifyProviderRegistration + + ServiceRegistration notifyConsumerRegistration + + ServiceRegistration dataProviderRegistration + + ServiceRegistration dataConsumerRegistration + + private val proxyGenerationLock = new ReentrantLock; + + private val routerGenerationLock = new ReentrantLock; + + public new(BundleContext bundleContext) { + _brokerBundleContext = bundleContext; + } + def start() { + log.info("Starting MD-SAL: Binding Aware Broker"); initGenerator(); val executor = Executors.newCachedThreadPool; + // Initialization of notificationBroker - notifyBroker = new NotificationBrokerImpl(executor); - notifyBroker.invokerFactory = generator.invokerFactory; - dataBroker = new DataBrokerImpl(); - val brokerProperties = newProperties(); - notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker, - brokerProperties) - brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties) - brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties) - brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties) + log.info("Starting MD-SAL: Binding Aware Notification Broker"); + + log.info("Starting MD-SAL: Binding Aware Data Broker"); + log.info("Starting MD-SAL: Binding Aware Data Broker"); + log.info("MD-SAL: Binding Aware Broker Started"); } def initGenerator() { @@ -127,20 +151,37 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { * If proxy class does not exist for supplied service class it will be generated automatically. */ private def getManagedDirectProxy(Class service) { - var RpcProxyContext existing = null + if ((existing = managedProxies.get(service)) != null) { return existing.proxy } - val proxyInstance = generator.getDirectProxyFor(service) - val rpcProxyCtx = new RpcProxyContext(proxyInstance.class) - val properties = new Hashtable() - rpcProxyCtx.proxy = proxyInstance as RpcService - - properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY - rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties) - managedProxies.put(service, rpcProxyCtx) - return rpcProxyCtx.proxy + return withLock(proxyGenerationLock) [ | + val maybeProxy = managedProxies.get(service); + if (maybeProxy !== null) { + return maybeProxy.proxy; + } + + + val proxyInstance = generator.getDirectProxyFor(service) + val rpcProxyCtx = new RpcProxyContext(proxyInstance.class) + val properties = new Hashtable() + rpcProxyCtx.proxy = proxyInstance as RpcService + properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY + rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties) + managedProxies.put(service, rpcProxyCtx) + return rpcProxyCtx.proxy + ] + } + + private static def T withLock(ReentrantLock lock, Callable method) { + try { + lock.lock(); + val ret = method.call; + return ret; + } finally { + lock.unlock(); + } } /** @@ -150,16 +191,11 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { def registerRpcImplementation(Class type, T service, OsgiProviderContext context, Hashtable properties) { val proxy = getManagedDirectProxy(type) - checkState(proxy.delegate === null, "The Service for type {} is already registered", type) + checkState(proxy.delegate === null, "The Service for type %s is already registered", type) val osgiReg = context.bundleContext.registerService(type, service, properties); proxy.delegate = service; - return new RpcServiceRegistrationImpl(type, service, osgiReg); - } - - def RpcRegistration registerMountedRpcImplementation(Class type, T service, - InstanceIdentifier identifier, OsgiProviderContext context) { - throw new UnsupportedOperationException("TODO: auto-generated method stub") + return new RpcServiceRegistrationImpl(type, service, osgiReg, this); } def RoutedRpcRegistration registerRoutedRpcImplementation(Class type, T service, @@ -177,14 +213,20 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { } // We created Router - val newRouter = generator.getRouterFor(type); - checkState(newRouter !== null); - rpcRouters.put(type, newRouter); - - // We create / update Direct Proxy for router - val proxy = getManagedDirectProxy(type); - proxy.delegate = newRouter.invocationProxy - return newRouter; + return withLock(routerGenerationLock) [ | + val maybeRouter = rpcRouters.get(type); + if (maybeRouter !== null) { + return maybeRouter as RpcRouter; + } + + val newRouter = generator.getRouterFor(type); + checkState(newRouter !== null); + rpcRouters.put(type, newRouter); + // We create / update Direct Proxy for router + val proxy = getManagedDirectProxy(type); + proxy.delegate = newRouter.invocationProxy + return newRouter; + ] } @@ -199,6 +241,8 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { // Updating internal structure of registration routingTable.updateRoute(path, registration.instance) + + // Update routing table / send announce to message bus val success = paths.put(context, path); } @@ -228,9 +272,34 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { val routingTable = router.getRoutingTable(context) val path = ctxMap.value routingTable.removeRoute(path) + } + } + + protected def void unregisterRpcService(RpcServiceRegistrationImpl registration) { + val type = registration.serviceType; + + val proxy = managedProxies.get(type); + if (proxy.proxy.delegate === registration.instance) { + proxy.proxy.delegate = null; } } + + def createDelegate(Class type) { + getManagedDirectProxy(type); + } + + def getRpcRouters() { + return Collections.unmodifiableMap(rpcRouters); + } + + override close() { + dataConsumerRegistration.unregister() + dataProviderRegistration.unregister() + notifyConsumerRegistration.unregister() + notifyProviderRegistration.unregister() + } + } class RoutedRpcRegistrationImpl extends AbstractObjectRegistration implements RoutedRpcRegistration { @@ -265,10 +334,6 @@ class RoutedRpcRegistrationImpl extends AbstractObjectRegi unregisterPath(context, instance); } - override getService() { - return instance; - } - override registerPath(Class context, InstanceIdentifier path) { checkClosed() broker.registerPath(this, context, path); @@ -279,9 +344,35 @@ class RoutedRpcRegistrationImpl extends AbstractObjectRegi broker.unregisterPath(this, context, path); } + override getServiceType() { + return router.serviceType; + } + private def checkClosed() { if (closed) throw new IllegalStateException("Registration was closed."); } } + +class RpcServiceRegistrationImpl extends AbstractObjectRegistration implements RpcRegistration { + + val ServiceRegistration osgiRegistration; + private var BindingAwareBrokerImpl broker; + + @Property + val Class serviceType; + + public new(Class type, T service, ServiceRegistration osgiReg, BindingAwareBrokerImpl broker) { + super(service); + this._serviceType = type; + this.osgiRegistration = osgiReg; + this.broker = broker; + } + + override protected removeRegistration() { + broker.unregisterRpcService(this); + broker = null; + } + +}