X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FBindingAwareBrokerImpl.xtend;h=9381a5a070e67e7c82226c5b920b06f9a952cc67;hp=8a3d2c0ecc12a49161bee2ec8012d5f2707279e7;hb=33ff5360d4f3aec58518e22f08c706455865d685;hpb=526185d061ed50c75890b31a376e9495144b660a diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend index 8a3d2c0ecc..9381a5a070 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend @@ -31,28 +31,85 @@ import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator import org.opendaylight.yangtools.yang.binding.InstanceIdentifier import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration +import org.opendaylight.controller.sal.binding.api.data.DataProviderService +import org.opendaylight.controller.sal.binding.api.data.DataBrokerService +import org.opendaylight.controller.sal.binding.spi.RpcRouter +import java.util.concurrent.ConcurrentHashMap +import static com.google.common.base.Preconditions.* +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration +import org.opendaylight.yangtools.yang.binding.BaseIdentity +import com.google.common.collect.Multimap +import com.google.common.collect.HashMultimap +import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.* +import java.util.concurrent.Executors +import java.util.Collections +import org.opendaylight.yangtools.yang.binding.DataObject +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.Callable +import java.util.WeakHashMap +import javax.annotation.concurrent.GuardedBy +import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry -class BindingAwareBrokerImpl implements BindingAwareBroker { +class BindingAwareBrokerImpl implements BindingAwareBroker, RpcProviderRegistry, AutoCloseable { private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl) - private val clsPool = ClassPool.getDefault() - private var RuntimeCodeGenerator generator; - private Map, RpcProxyContext> managedProxies = new HashMap(); - private var NotificationBrokerImpl notifyBroker - private var ServiceRegistration notifyBrokerRegistration + private InstanceIdentifier root = InstanceIdentifier.builder().toInstance(); + + private static val clsPool = ClassPool.getDefault() + public static var RuntimeCodeGenerator generator; + + /** + * Map of all Managed Direct Proxies + * + */ + private val Map, RpcProxyContext> managedProxies = new ConcurrentHashMap(); + + /** + * + * Map of all available Rpc Routers + * + * + */ + private val Map, RpcRouter> rpcRouters = new WeakHashMap(); + + @Property + private var NotificationProviderService notifyBroker + + @Property + private var DataProviderService dataBroker @Property var BundleContext brokerBundleContext + ServiceRegistration notifyProviderRegistration + + ServiceRegistration notifyConsumerRegistration + + ServiceRegistration dataProviderRegistration + + ServiceRegistration dataConsumerRegistration + + private val proxyGenerationLock = new ReentrantLock; + + private val routerGenerationLock = new ReentrantLock; + + public new(BundleContext bundleContext) { + _brokerBundleContext = bundleContext; + } + def start() { + log.info("Starting MD-SAL: Binding Aware Broker"); initGenerator(); + val executor = Executors.newCachedThreadPool; + // Initialization of notificationBroker - notifyBroker = new NotificationBrokerImpl(null); - val brokerProperties = newProperties(); - notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker, - brokerProperties) - brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties) + log.info("Starting MD-SAL: Binding Aware Notification Broker"); + + log.info("Starting MD-SAL: Binding Aware Data Broker"); + + log.info("Starting MD-SAL: Binding Aware Data Broker"); + log.info("MD-SAL: Binding Aware Broker Started"); } def initGenerator() { @@ -94,46 +151,235 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { * * If proxy class does not exist for supplied service class it will be generated automatically. */ - def getManagedDirectProxy(Class service) { - + private def getManagedDirectProxy(Class service) { var RpcProxyContext existing = null - if((existing = managedProxies.get(service)) != null) { + + if ((existing = managedProxies.get(service)) != null) { return existing.proxy } - val proxyClass = generator.generateDirectProxy(service) - val rpcProxyCtx = new RpcProxyContext(proxyClass) - val properties = new Hashtable() - rpcProxyCtx.proxy = proxyClass.newInstance as RpcService + return withLock(proxyGenerationLock) [ | + val maybeProxy = managedProxies.get(service); + if (maybeProxy !== null) { + return maybeProxy.proxy; + } + + + val proxyInstance = generator.getDirectProxyFor(service) + val rpcProxyCtx = new RpcProxyContext(proxyInstance.class) + val properties = new Hashtable() + rpcProxyCtx.proxy = proxyInstance as RpcService + properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY + rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties) + managedProxies.put(service, rpcProxyCtx) + return rpcProxyCtx.proxy + ] + } - properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY - rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties) - managedProxies.put(service, rpcProxyCtx) - return rpcProxyCtx.proxy + private static def T withLock(ReentrantLock lock, Callable method) { + try { + lock.lock(); + val ret = method.call; + return ret; + } finally { + lock.unlock(); + } } /** * Registers RPC Implementation * */ - def registerRpcImplementation(Class type, T service, OsgiProviderContext context, - Hashtable properties) { + override addRpcImplementation(Class type, T service) { + checkNotNull(type, "Service type should not be null") + checkNotNull(service, "Service type should not be null") + val proxy = getManagedDirectProxy(type) - if(proxy.delegate != null) { - throw new IllegalStateException("Service " + type + "is already registered"); - } - val osgiReg = context.bundleContext.registerService(type, service, properties); + checkState(proxy.delegate === null, "The Service for type %s is already registered", type) + proxy.delegate = service; - return new RpcServiceRegistrationImpl(type, service, osgiReg); + return new RpcServiceRegistrationImpl(type, service, this); + } + + override RoutedRpcRegistration addRoutedRpcImplementation(Class type, T service) { + checkNotNull(type, "Service type should not be null") + checkNotNull(service, "Service type should not be null") + + val router = resolveRpcRouter(type); + checkState(router !== null) + return new RoutedRpcRegistrationImpl(service, router, this) + } + + override getRpcService(Class service) { + checkNotNull(service, "Service should not be null"); + return getManagedDirectProxy(service) as T; + } + + private def RpcRouter resolveRpcRouter(Class type) { + + val router = rpcRouters.get(type); + if (router !== null) { + return router as RpcRouter; + } + + // We created Router + return withLock(routerGenerationLock) [ | + val maybeRouter = rpcRouters.get(type); + if (maybeRouter !== null) { + return maybeRouter as RpcRouter; + } + + val newRouter = generator.getRouterFor(type); + checkState(newRouter !== null); + rpcRouters.put(type, newRouter); + // We create / update Direct Proxy for router + val proxy = getManagedDirectProxy(type); + proxy.delegate = newRouter.invocationProxy + return newRouter; + ] + + } + + protected def void registerPath(RoutedRpcRegistrationImpl registration, + Class context, InstanceIdentifier path) { + + val router = registration.router; + val paths = registration.registeredPaths; + + val routingTable = router.getRoutingTable(context) + checkState(routingTable != null); + + // Updating internal structure of registration + routingTable.updateRoute(path, registration.instance) + + // Update routing table / send announce to message bus + val success = paths.put(context, path); + } + + protected def void unregisterPath(RoutedRpcRegistrationImpl registration, + Class context, InstanceIdentifier path) { + + val router = registration.router; + val paths = registration.registeredPaths; + + val routingTable = router.getRoutingTable(context) + checkState(routingTable != null); + + // Updating internal structure of registration + val target = routingTable.getRoute(path) + checkState(target === registration.instance) + routingTable.removeRoute(path) + checkState(paths.remove(context, path)); + } + + protected def void unregisterRoutedRpcService(RoutedRpcRegistrationImpl registration) { + + val router = registration.router; + val paths = registration.registeredPaths; + + for (ctxMap : registration.registeredPaths.entries) { + val context = ctxMap.key + val routingTable = router.getRoutingTable(context) + val path = ctxMap.value + routingTable.removeRoute(path) + } + } + + protected def void unregisterRpcService(RpcServiceRegistrationImpl registration) { + + val type = registration.serviceType; + + val proxy = managedProxies.get(type); + if (proxy.proxy.delegate === registration.instance) { + proxy.proxy.delegate = null; + } + } + + def createDelegate(Class type) { + getManagedDirectProxy(type); + } + + def getRpcRouters() { + return Collections.unmodifiableMap(rpcRouters); + } + + override close() { + dataConsumerRegistration.unregister() + dataProviderRegistration.unregister() + notifyConsumerRegistration.unregister() + notifyProviderRegistration.unregister() + } + +} + +class RoutedRpcRegistrationImpl extends AbstractObjectRegistration implements RoutedRpcRegistration { + + @Property + private val BindingAwareBrokerImpl broker; + + @Property + private val RpcRouter router; + + @Property + private val Multimap, InstanceIdentifier> registeredPaths = HashMultimap.create(); + + private var closed = false; + + new(T instance, RpcRouter backingRouter, BindingAwareBrokerImpl broker) { + super(instance) + _router = backingRouter; + _broker = broker; + } + + override protected removeRegistration() { + closed = true + broker.unregisterRoutedRpcService(this) + } + + override registerInstance(Class context, InstanceIdentifier instance) { + registerPath(context, instance); + } + + override unregisterInstance(Class context, InstanceIdentifier instance) { + unregisterPath(context, instance); + } + + override registerPath(Class context, InstanceIdentifier path) { + checkClosed() + broker.registerPath(this, context, path); } - def RpcRegistration registerMountedRpcImplementation(Class tyoe, T service, InstanceIdentifier identifier, - OsgiProviderContext context, Hashtable properties) { - throw new UnsupportedOperationException("TODO: auto-generated method stub") + override unregisterPath(Class context, InstanceIdentifier path) { + checkClosed() + broker.unregisterPath(this, context, path); + } + + override getServiceType() { + return router.serviceType; + } + + private def checkClosed() { + if (closed) + throw new IllegalStateException("Registration was closed."); + } + +} + +class RpcServiceRegistrationImpl extends AbstractObjectRegistration implements RpcRegistration { + + private var BindingAwareBrokerImpl broker; + + @Property + val Class serviceType; + + public new(Class type, T service, BindingAwareBrokerImpl broker) { + super(service); + this._serviceType = type; + this.broker = broker; } - def RoutedRpcRegistration registerRoutedRpcImplementation(Class type, T service, OsgiProviderContext context, - Hashtable properties) { - throw new UnsupportedOperationException("TODO: auto-generated method stub") + override protected removeRegistration() { + broker.unregisterRpcService(this); + broker = null; } }