/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.binding.impl import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer import org.opendaylight.controller.sal.binding.api.BindingAwareProvider import org.opendaylight.yangtools.yang.binding.RpcService import javassist.ClassPool import org.osgi.framework.BundleContext import java.util.Map import java.util.HashMap import javassist.LoaderClassPath import org.opendaylight.controller.sal.binding.api.BindingAwareBroker import java.util.Hashtable import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.* import org.opendaylight.controller.sal.binding.api.NotificationProviderService import org.osgi.framework.ServiceRegistration import static org.opendaylight.controller.sal.binding.impl.osgi.Constants.* import static extension org.opendaylight.controller.sal.binding.impl.osgi.PropertiesUtils.* import org.opendaylight.controller.sal.binding.api.NotificationService import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext import org.slf4j.LoggerFactory import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator import org.opendaylight.yangtools.yang.binding.InstanceIdentifier import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration import org.opendaylight.controller.sal.binding.api.data.DataProviderService import org.opendaylight.controller.sal.binding.api.data.DataBrokerService import org.opendaylight.controller.sal.binding.spi.RpcRouter import java.util.concurrent.ConcurrentHashMap import static com.google.common.base.Preconditions.* import org.opendaylight.yangtools.concepts.AbstractObjectRegistration import org.opendaylight.yangtools.yang.binding.BaseIdentity import com.google.common.collect.Multimap import com.google.common.collect.HashMultimap import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.* import java.util.concurrent.Executors class BindingAwareBrokerImpl implements BindingAwareBroker { private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl) private val clsPool = ClassPool.getDefault() private var RuntimeCodeGenerator generator; /** * Map of all Managed Direct Proxies * */ private val Map, RpcProxyContext> managedProxies = new ConcurrentHashMap(); /** * * Map of all available Rpc Routers * * */ private val Map, RpcRouter> rpcRouters = new ConcurrentHashMap(); @Property private var NotificationBrokerImpl notifyBroker @Property private var DataBrokerImpl dataBroker private var ServiceRegistration notifyBrokerRegistration @Property var BundleContext brokerBundleContext def start() { initGenerator(); val executor = Executors.newCachedThreadPool; // Initialization of notificationBroker notifyBroker = new NotificationBrokerImpl(executor); notifyBroker.invokerFactory = generator.invokerFactory; dataBroker = new DataBrokerImpl(); dataBroker.executor = executor; val brokerProperties = newProperties(); notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker, brokerProperties) brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties) brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties) brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties) } def initGenerator() { // YANG Binding Class Loader clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader)); generator = new RuntimeCodeGenerator(clsPool); } override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) { val ctx = consumer.createContext(bundleCtx) consumer.onSessionInitialized(ctx) return ctx } override registerProvider(BindingAwareProvider provider, BundleContext bundleCtx) { val ctx = provider.createContext(bundleCtx) provider.onSessionInitialized(ctx) provider.onSessionInitiated(ctx as ProviderContext) return ctx } private def createContext(BindingAwareConsumer consumer, BundleContext consumerCtx) { new OsgiConsumerContext(consumerCtx, this) } private def createContext(BindingAwareProvider provider, BundleContext providerCtx) { new OsgiProviderContext(providerCtx, this) } /** * Returns a Managed Direct Proxy for supplied class * * Managed direct proxy is a generated proxy class conforming to the supplied interface * which delegates all calls to the backing delegate. * * Proxy does not do any validation, null pointer checks or modifies data in any way, it * is only use to avoid exposing direct references to backing implementation of service. * * If proxy class does not exist for supplied service class it will be generated automatically. */ private def getManagedDirectProxy(Class service) { var RpcProxyContext existing = null if ((existing = managedProxies.get(service)) != null) { return existing.proxy } val proxyInstance = generator.getDirectProxyFor(service) val rpcProxyCtx = new RpcProxyContext(proxyInstance.class) val properties = new Hashtable() rpcProxyCtx.proxy = proxyInstance as RpcService properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties) managedProxies.put(service, rpcProxyCtx) return rpcProxyCtx.proxy } /** * Registers RPC Implementation * */ def registerRpcImplementation(Class type, T service, OsgiProviderContext context, Hashtable properties) { val proxy = getManagedDirectProxy(type) checkState(proxy.delegate === null, "The Service for type {} is already registered", type) val osgiReg = context.bundleContext.registerService(type, service, properties); proxy.delegate = service; return new RpcServiceRegistrationImpl(type, service, osgiReg); } def RpcRegistration registerMountedRpcImplementation(Class type, T service, InstanceIdentifier identifier, OsgiProviderContext context) { throw new UnsupportedOperationException("TODO: auto-generated method stub") } def RoutedRpcRegistration registerRoutedRpcImplementation(Class type, T service, OsgiProviderContext context) { val router = resolveRpcRouter(type); checkState(router !== null) return new RoutedRpcRegistrationImpl(service, router, this) } private def RpcRouter resolveRpcRouter(Class type) { val router = rpcRouters.get(type); if (router !== null) { return router as RpcRouter; } // We created Router val newRouter = generator.getRouterFor(type); checkState(newRouter !== null); rpcRouters.put(type, newRouter); // We create / update Direct Proxy for router val proxy = getManagedDirectProxy(type); proxy.delegate = newRouter.invocationProxy return newRouter; } protected def void registerPath(RoutedRpcRegistrationImpl registration, Class context, InstanceIdentifier path) { val router = registration.router; val paths = registration.registeredPaths; val routingTable = router.getRoutingTable(context) checkState(routingTable != null); // Updating internal structure of registration routingTable.updateRoute(path, registration.instance) // Update routing table / send announce to message bus val success = paths.put(context, path); } protected def void unregisterPath(RoutedRpcRegistrationImpl registration, Class context, InstanceIdentifier path) { val router = registration.router; val paths = registration.registeredPaths; val routingTable = router.getRoutingTable(context) checkState(routingTable != null); // Updating internal structure of registration val target = routingTable.getRoute(path) checkState(target === registration.instance) routingTable.removeRoute(path) checkState(paths.remove(context, path)); } protected def void unregisterRoutedRpcService(RoutedRpcRegistrationImpl registration) { val router = registration.router; val paths = registration.registeredPaths; for (ctxMap : registration.registeredPaths.entries) { val context = ctxMap.key val routingTable = router.getRoutingTable(context) val path = ctxMap.value routingTable.removeRoute(path) } } def createDelegate(Class type) { getManagedDirectProxy(type); } } class RoutedRpcRegistrationImpl extends AbstractObjectRegistration implements RoutedRpcRegistration { @Property private val BindingAwareBrokerImpl broker; @Property private val RpcRouter router; @Property private val Multimap, InstanceIdentifier> registeredPaths = HashMultimap.create(); private var closed = false; new(T instance, RpcRouter backingRouter, BindingAwareBrokerImpl broker) { super(instance) _router = backingRouter; _broker = broker; } override protected removeRegistration() { closed = true broker.unregisterRoutedRpcService(this) } override registerInstance(Class context, InstanceIdentifier instance) { registerPath(context, instance); } override unregisterInstance(Class context, InstanceIdentifier instance) { unregisterPath(context, instance); } override getService() { return instance; } override registerPath(Class context, InstanceIdentifier path) { checkClosed() broker.registerPath(this, context, path); } override unregisterPath(Class context, InstanceIdentifier path) { checkClosed() broker.unregisterPath(this, context, path); } private def checkClosed() { if (closed) throw new IllegalStateException("Registration was closed."); } }