X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FBindingAwareBrokerImpl.xtend;h=31d5d0126fc8115546d1d1b7709671e946b2e026;hp=298a74ece5f71982ae7bcbb48f205e4039c44046;hb=d2f2d5a34ccd0a715ab6ea7c1f5ac61f16f6e6cc;hpb=738d46bed116293e3e42171ad2035ab805b0b2be diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend index 298a74ece5..31d5d0126f 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend @@ -28,28 +28,99 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderCo import org.slf4j.LoggerFactory import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration +import org.opendaylight.controller.sal.binding.api.data.DataProviderService +import org.opendaylight.controller.sal.binding.api.data.DataBrokerService +import org.opendaylight.controller.sal.binding.spi.RpcRouter +import java.util.concurrent.ConcurrentHashMap +import static com.google.common.base.Preconditions.* +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration +import org.opendaylight.yangtools.yang.binding.BaseIdentity +import com.google.common.collect.Multimap +import com.google.common.collect.HashMultimap +import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.* +import java.util.concurrent.Executors +import java.util.Collections +import org.opendaylight.yangtools.yang.binding.DataObject +import org.opendaylight.controller.sal.binding.impl.connect.dom.ConnectorActivator -class BindingAwareBrokerImpl implements BindingAwareBroker { +class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable { private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl) - + + + private InstanceIdentifier root = InstanceIdentifier.builder().toInstance(); + private val clsPool = ClassPool.getDefault() private var RuntimeCodeGenerator generator; - private Map, RpcProxyContext> managedProxies = new HashMap(); + + + /** + * Map of all Managed Direct Proxies + * + */ + private val Map, RpcProxyContext> managedProxies = new ConcurrentHashMap(); + + /** + * + * Map of all available Rpc Routers + * + * + */ + private val Map, RpcRouter> rpcRouters = new ConcurrentHashMap(); + + @Property private var NotificationBrokerImpl notifyBroker - private var ServiceRegistration notifyBrokerRegistration + + @Property + private var DataBrokerImpl dataBroker @Property var BundleContext brokerBundleContext + + ServiceRegistration notifyProviderRegistration + + ServiceRegistration notifyConsumerRegistration + + ServiceRegistration dataProviderRegistration + + ServiceRegistration dataConsumerRegistration + + ConnectorActivator connectorActivator + + + public new(BundleContext bundleContext) { + _brokerBundleContext = bundleContext; + } def start() { + log.info("Starting MD-SAL: Binding Aware Broker"); initGenerator(); + val executor = Executors.newCachedThreadPool; // Initialization of notificationBroker - notifyBroker = new NotificationBrokerImpl(null); + log.info("Starting MD-SAL: Binding Aware Notification Broker"); + notifyBroker = new NotificationBrokerImpl(executor); + notifyBroker.invokerFactory = generator.invokerFactory; + + log.info("Starting MD-SAL: Binding Aware Data Broker"); + dataBroker = new DataBrokerImpl(); + dataBroker.executor = executor; + val brokerProperties = newProperties(); - notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker, + + + log.info("Starting MD-SAL: Binding Aware Data Broker"); + notifyProviderRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker, brokerProperties) - brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties) + notifyConsumerRegistration = brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties) + dataProviderRegistration = brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties) + dataConsumerRegistration = brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties) + + connectorActivator = new ConnectorActivator(dataBroker,brokerBundleContext); + connectorActivator.start(); + log.info("MD-SAL: Binding Aware Broker Started"); } def initGenerator() { @@ -91,22 +162,23 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { * * If proxy class does not exist for supplied service class it will be generated automatically. */ - def getManagedDirectProxy(Class service) { - + private def getManagedDirectProxy(Class service) { + var RpcProxyContext existing = null if ((existing = managedProxies.get(service)) != null) { return existing.proxy } - val proxyClass = generator.generateDirectProxy(service) - val rpcProxyCtx = new RpcProxyContext(proxyClass) + val proxyInstance = generator.getDirectProxyFor(service) + val rpcProxyCtx = new RpcProxyContext(proxyInstance.class) val properties = new Hashtable() - rpcProxyCtx.proxy = proxyClass.newInstance as RpcService + rpcProxyCtx.proxy = proxyInstance as RpcService properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties) managedProxies.put(service, rpcProxyCtx) return rpcProxyCtx.proxy } + /** * Registers RPC Implementation * @@ -114,11 +186,181 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { def registerRpcImplementation(Class type, T service, OsgiProviderContext context, Hashtable properties) { val proxy = getManagedDirectProxy(type) - if(proxy.delegate != null) { - throw new IllegalStateException("Service " + type + "is already registered"); - } + checkState(proxy.delegate === null, "The Service for type %s is already registered", type) + val osgiReg = context.bundleContext.registerService(type, service, properties); proxy.delegate = service; - return new RpcServiceRegistrationImpl(type, service, osgiReg); + return new RpcServiceRegistrationImpl(type, service, osgiReg,this); + } + + def RoutedRpcRegistration registerRoutedRpcImplementation(Class type, T service, + OsgiProviderContext context) { + val router = resolveRpcRouter(type); + checkState(router !== null) + return new RoutedRpcRegistrationImpl(service, router, this) + } + + private def RpcRouter resolveRpcRouter(Class type) { + + val router = rpcRouters.get(type); + if (router !== null) { + return router as RpcRouter; + } + + // We created Router + val newRouter = generator.getRouterFor(type); + checkState(newRouter !== null); + rpcRouters.put(type, newRouter); + + // We create / update Direct Proxy for router + val proxy = getManagedDirectProxy(type); + proxy.delegate = newRouter.invocationProxy + return newRouter; + + } + + protected def void registerPath(RoutedRpcRegistrationImpl registration, + Class context, InstanceIdentifier path) { + + val router = registration.router; + val paths = registration.registeredPaths; + + val routingTable = router.getRoutingTable(context) + checkState(routingTable != null); + + // Updating internal structure of registration + routingTable.updateRoute(path, registration.instance) + // Update routing table / send announce to message bus + + val success = paths.put(context, path); + } + + protected def void unregisterPath(RoutedRpcRegistrationImpl registration, + Class context, InstanceIdentifier path) { + + val router = registration.router; + val paths = registration.registeredPaths; + + val routingTable = router.getRoutingTable(context) + checkState(routingTable != null); + + // Updating internal structure of registration + val target = routingTable.getRoute(path) + checkState(target === registration.instance) + routingTable.removeRoute(path) + checkState(paths.remove(context, path)); + } + + protected def void unregisterRoutedRpcService(RoutedRpcRegistrationImpl registration) { + + val router = registration.router; + val paths = registration.registeredPaths; + + for (ctxMap : registration.registeredPaths.entries) { + val context = ctxMap.key + val routingTable = router.getRoutingTable(context) + val path = ctxMap.value + routingTable.removeRoute(path) + } + } + + protected def void unregisterRpcService(RpcServiceRegistrationImpl registration) { + + val type = registration.serviceType; + + val proxy = managedProxies.get(type); + if(proxy.proxy.delegate === registration.instance) { + proxy.proxy.delegate = null; + } + } + + def createDelegate(Class type) { + getManagedDirectProxy(type); + } + + def getRpcRouters() { + return Collections.unmodifiableMap(rpcRouters); + } + + override close() { + dataConsumerRegistration.unregister() + dataProviderRegistration.unregister() + notifyConsumerRegistration.unregister() + notifyProviderRegistration.unregister() + } + +} + +class RoutedRpcRegistrationImpl extends AbstractObjectRegistration implements RoutedRpcRegistration { + + @Property + private val BindingAwareBrokerImpl broker; + + @Property + private val RpcRouter router; + + @Property + private val Multimap, InstanceIdentifier> registeredPaths = HashMultimap.create(); + + private var closed = false; + + new(T instance, RpcRouter backingRouter, BindingAwareBrokerImpl broker) { + super(instance) + _router = backingRouter; + _broker = broker; + } + + override protected removeRegistration() { + closed = true + broker.unregisterRoutedRpcService(this) + } + + override registerInstance(Class context, InstanceIdentifier instance) { + registerPath(context, instance); } + + override unregisterInstance(Class context, InstanceIdentifier instance) { + unregisterPath(context, instance); + } + + override registerPath(Class context, InstanceIdentifier path) { + checkClosed() + broker.registerPath(this, context, path); + } + + override unregisterPath(Class context, InstanceIdentifier path) { + checkClosed() + broker.unregisterPath(this, context, path); + } + + override getServiceType() { + return router.serviceType; + } + + private def checkClosed() { + if (closed) + throw new IllegalStateException("Registration was closed."); + } + +} +class RpcServiceRegistrationImpl extends AbstractObjectRegistration implements RpcRegistration { + + val ServiceRegistration osgiRegistration; + private var BindingAwareBrokerImpl broker; + + @Property + val Class serviceType; + + public new(Class type, T service, ServiceRegistration osgiReg,BindingAwareBrokerImpl broker) { + super(service); + this._serviceType = type; + this.osgiRegistration = osgiReg; + this.broker= broker; + } + + override protected removeRegistration() { + broker.unregisterRpcService(this); + broker = null; + } + }