X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FBindingAwareBrokerImpl.xtend;h=740ae887b0588bcc7648aaba49bb7f63eef5b53a;hb=8b6075992f1e18eb678ee4e50e13b3d2d1397a85;hp=298a74ece5f71982ae7bcbb48f205e4039c44046;hpb=fe024ad74b8656c3ee61b9ddff6009a779aa2189;p=controller.git diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend index 298a74ece5..740ae887b0 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingAwareBrokerImpl.xtend @@ -28,28 +28,63 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderCo import org.slf4j.LoggerFactory import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration +import org.opendaylight.controller.sal.binding.api.data.DataProviderService +import org.opendaylight.controller.sal.binding.api.data.DataBrokerService +import org.opendaylight.controller.sal.binding.spi.RpcRouter +import java.util.concurrent.ConcurrentHashMap +import static com.google.common.base.Preconditions.* +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration +import org.opendaylight.yangtools.yang.binding.BaseIdentity +import com.google.common.collect.Multimap +import com.google.common.collect.HashMultimap +import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.* +import java.util.concurrent.Executors class BindingAwareBrokerImpl implements BindingAwareBroker { private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl) - + private val clsPool = ClassPool.getDefault() private var RuntimeCodeGenerator generator; - private Map, RpcProxyContext> managedProxies = new HashMap(); + + /** + * Map of all Managed Direct Proxies + * + */ + private val Map, RpcProxyContext> managedProxies = new ConcurrentHashMap(); + + /** + * + * Map of all available Rpc Routers + * + * + */ + private val Map, RpcRouter> rpcRouters = new ConcurrentHashMap(); + private var NotificationBrokerImpl notifyBroker + private var DataBrokerImpl dataBroker private var ServiceRegistration notifyBrokerRegistration - + @Property var BundleContext brokerBundleContext def start() { initGenerator(); + val executor = Executors.newCachedThreadPool; // Initialization of notificationBroker - notifyBroker = new NotificationBrokerImpl(null); + notifyBroker = new NotificationBrokerImpl(executor); + notifyBroker.invokerFactory = generator.invokerFactory; + dataBroker = new DataBrokerImpl(); val brokerProperties = newProperties(); notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker, brokerProperties) brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties) + brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties) + brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties) + } def initGenerator() { @@ -91,22 +126,23 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { * * If proxy class does not exist for supplied service class it will be generated automatically. */ - def getManagedDirectProxy(Class service) { - + private def getManagedDirectProxy(Class service) { + var RpcProxyContext existing = null if ((existing = managedProxies.get(service)) != null) { return existing.proxy } - val proxyClass = generator.generateDirectProxy(service) - val rpcProxyCtx = new RpcProxyContext(proxyClass) + val proxyInstance = generator.getDirectProxyFor(service) + val rpcProxyCtx = new RpcProxyContext(proxyInstance.class) val properties = new Hashtable() - rpcProxyCtx.proxy = proxyClass.newInstance as RpcService + rpcProxyCtx.proxy = proxyInstance as RpcService properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties) managedProxies.put(service, rpcProxyCtx) return rpcProxyCtx.proxy } + /** * Registers RPC Implementation * @@ -114,11 +150,138 @@ class BindingAwareBrokerImpl implements BindingAwareBroker { def registerRpcImplementation(Class type, T service, OsgiProviderContext context, Hashtable properties) { val proxy = getManagedDirectProxy(type) - if(proxy.delegate != null) { - throw new IllegalStateException("Service " + type + "is already registered"); - } + checkState(proxy.delegate === null, "The Service for type {} is already registered", type) + val osgiReg = context.bundleContext.registerService(type, service, properties); proxy.delegate = service; return new RpcServiceRegistrationImpl(type, service, osgiReg); } + + def RpcRegistration registerMountedRpcImplementation(Class type, T service, + InstanceIdentifier identifier, OsgiProviderContext context) { + throw new UnsupportedOperationException("TODO: auto-generated method stub") + } + + def RoutedRpcRegistration registerRoutedRpcImplementation(Class type, T service, + OsgiProviderContext context) { + val router = resolveRpcRouter(type); + checkState(router !== null) + return new RoutedRpcRegistrationImpl(service, router, this) + } + + private def RpcRouter resolveRpcRouter(Class type) { + + val router = rpcRouters.get(type); + if (router !== null) { + return router as RpcRouter; + } + + // We created Router + val newRouter = generator.getRouterFor(type); + checkState(newRouter !== null); + rpcRouters.put(type, newRouter); + + // We create / update Direct Proxy for router + val proxy = getManagedDirectProxy(type); + proxy.delegate = newRouter.invocationProxy + return newRouter; + + } + + protected def void registerPath(RoutedRpcRegistrationImpl registration, + Class context, InstanceIdentifier path) { + + val router = registration.router; + val paths = registration.registeredPaths; + + val routingTable = router.getRoutingTable(context) + checkState(routingTable != null); + + // Updating internal structure of registration + routingTable.updateRoute(path, registration.instance) + val success = paths.put(context, path); + } + + protected def void unregisterPath(RoutedRpcRegistrationImpl registration, + Class context, InstanceIdentifier path) { + + val router = registration.router; + val paths = registration.registeredPaths; + + val routingTable = router.getRoutingTable(context) + checkState(routingTable != null); + + // Updating internal structure of registration + val target = routingTable.getRoute(path) + checkState(target === registration.instance) + routingTable.removeRoute(path) + checkState(paths.remove(context, path)); + } + + protected def void unregisterRoutedRpcService(RoutedRpcRegistrationImpl registration) { + + val router = registration.router; + val paths = registration.registeredPaths; + + for (ctxMap : registration.registeredPaths.entries) { + val context = ctxMap.key + val routingTable = router.getRoutingTable(context) + val path = ctxMap.value + routingTable.removeRoute(path) + + } + } +} + +class RoutedRpcRegistrationImpl extends AbstractObjectRegistration implements RoutedRpcRegistration { + + @Property + private val BindingAwareBrokerImpl broker; + + @Property + private val RpcRouter router; + + @Property + private val Multimap, InstanceIdentifier> registeredPaths = HashMultimap.create(); + + private var closed = false; + + new(T instance, RpcRouter backingRouter, BindingAwareBrokerImpl broker) { + super(instance) + _router = backingRouter; + _broker = broker; + } + + override protected removeRegistration() { + closed = true + broker.unregisterRoutedRpcService(this) + } + + override registerInstance(Class context, InstanceIdentifier instance) { + registerPath(context, instance); + } + + override unregisterInstance(Class context, InstanceIdentifier instance) { + unregisterPath(context, instance); + } + + override getService() { + return instance; + } + + override registerPath(Class context, InstanceIdentifier path) { + checkClosed() + broker.registerPath(this, context, path); + } + + override unregisterPath(Class context, InstanceIdentifier path) { + checkClosed() + broker.unregisterPath(this, context, path); + } + + private def checkClosed() { + if (closed) + throw new IllegalStateException("Registration was closed."); + } + }