2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.binding.impl
10 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer
11 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider
12 import org.opendaylight.yangtools.yang.binding.RpcService
13 import javassist.ClassPool
14 import org.osgi.framework.BundleContext
16 import java.util.HashMap
17 import javassist.LoaderClassPath
18 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker
19 import java.util.Hashtable
20 import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*
22 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
23 import org.osgi.framework.ServiceRegistration
24 import static org.opendaylight.controller.sal.binding.impl.osgi.Constants.*
25 import static extension org.opendaylight.controller.sal.binding.impl.osgi.PropertiesUtils.*
26 import org.opendaylight.controller.sal.binding.api.NotificationService
27 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext
29 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
34 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
35 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
36 import org.opendaylight.controller.sal.binding.spi.RpcRouter
37 import java.util.concurrent.ConcurrentHashMap
38 import static com.google.common.base.Preconditions.*
39 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
40 import org.opendaylight.yangtools.yang.binding.BaseIdentity
41 import com.google.common.collect.Multimap
42 import com.google.common.collect.HashMultimap
43 import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.*
44 import java.util.concurrent.Executors
45 import java.util.Collections
46 import org.opendaylight.yangtools.yang.binding.DataObject
47 import org.opendaylight.controller.sal.binding.impl.connect.dom.ConnectorActivator
49 class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable {
50 private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
53 private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
55 private val clsPool = ClassPool.getDefault()
56 private var RuntimeCodeGenerator generator;
60 * Map of all Managed Direct Proxies
63 private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
67 * Map of all available Rpc Routers
71 private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new ConcurrentHashMap();
74 private var NotificationBrokerImpl notifyBroker
77 private var DataBrokerImpl dataBroker
80 var BundleContext brokerBundleContext
82 ServiceRegistration<NotificationProviderService> notifyProviderRegistration
84 ServiceRegistration<NotificationService> notifyConsumerRegistration
86 ServiceRegistration<DataProviderService> dataProviderRegistration
88 ServiceRegistration<DataBrokerService> dataConsumerRegistration
90 ConnectorActivator connectorActivator
93 public new(BundleContext bundleContext) {
94 _brokerBundleContext = bundleContext;
98 log.info("Starting MD-SAL: Binding Aware Broker");
101 val executor = Executors.newCachedThreadPool;
102 // Initialization of notificationBroker
103 log.info("Starting MD-SAL: Binding Aware Notification Broker");
104 notifyBroker = new NotificationBrokerImpl(executor);
105 notifyBroker.invokerFactory = generator.invokerFactory;
107 log.info("Starting MD-SAL: Binding Aware Data Broker");
108 dataBroker = new DataBrokerImpl();
109 dataBroker.executor = executor;
111 val brokerProperties = newProperties();
114 log.info("Starting MD-SAL: Binding Aware Data Broker");
115 notifyProviderRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
117 notifyConsumerRegistration = brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
118 dataProviderRegistration = brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
119 dataConsumerRegistration = brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
121 connectorActivator = new ConnectorActivator(dataBroker,brokerBundleContext);
122 connectorActivator.start();
123 log.info("MD-SAL: Binding Aware Broker Started");
126 def initGenerator() {
128 // YANG Binding Class Loader
129 clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
130 generator = new RuntimeCodeGenerator(clsPool);
133 override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) {
134 val ctx = consumer.createContext(bundleCtx)
135 consumer.onSessionInitialized(ctx)
139 override registerProvider(BindingAwareProvider provider, BundleContext bundleCtx) {
140 val ctx = provider.createContext(bundleCtx)
141 provider.onSessionInitialized(ctx)
142 provider.onSessionInitiated(ctx as ProviderContext)
146 private def createContext(BindingAwareConsumer consumer, BundleContext consumerCtx) {
147 new OsgiConsumerContext(consumerCtx, this)
150 private def createContext(BindingAwareProvider provider, BundleContext providerCtx) {
151 new OsgiProviderContext(providerCtx, this)
155 * Returns a Managed Direct Proxy for supplied class
157 * Managed direct proxy is a generated proxy class conforming to the supplied interface
158 * which delegates all calls to the backing delegate.
160 * Proxy does not do any validation, null pointer checks or modifies data in any way, it
161 * is only use to avoid exposing direct references to backing implementation of service.
163 * If proxy class does not exist for supplied service class it will be generated automatically.
165 private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
167 var RpcProxyContext existing = null
168 if ((existing = managedProxies.get(service)) != null) {
169 return existing.proxy
171 val proxyInstance = generator.getDirectProxyFor(service)
172 val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
173 val properties = new Hashtable<String, String>()
174 rpcProxyCtx.proxy = proxyInstance as RpcService
176 properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
177 rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
178 managedProxies.put(service, rpcProxyCtx)
179 return rpcProxyCtx.proxy
183 * Registers RPC Implementation
186 def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
187 Hashtable<String, String> properties) {
188 val proxy = getManagedDirectProxy(type)
189 checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
191 val osgiReg = context.bundleContext.registerService(type, service, properties);
192 proxy.delegate = service;
193 return new RpcServiceRegistrationImpl<T>(type, service, osgiReg,this);
196 def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
197 OsgiProviderContext context) {
198 val router = resolveRpcRouter(type);
199 checkState(router !== null)
200 return new RoutedRpcRegistrationImpl<T>(service, router, this)
203 private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
205 val router = rpcRouters.get(type);
206 if (router !== null) {
207 return router as RpcRouter<T>;
211 val newRouter = generator.getRouterFor(type);
212 checkState(newRouter !== null);
213 rpcRouters.put(type, newRouter);
215 // We create / update Direct Proxy for router
216 val proxy = getManagedDirectProxy(type);
217 proxy.delegate = newRouter.invocationProxy
222 protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
223 Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
225 val router = registration.router;
226 val paths = registration.registeredPaths;
228 val routingTable = router.getRoutingTable(context)
229 checkState(routingTable != null);
231 // Updating internal structure of registration
232 routingTable.updateRoute(path, registration.instance)
233 // Update routing table / send announce to message bus
235 val success = paths.put(context, path);
238 protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
239 Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
241 val router = registration.router;
242 val paths = registration.registeredPaths;
244 val routingTable = router.getRoutingTable(context)
245 checkState(routingTable != null);
247 // Updating internal structure of registration
248 val target = routingTable.getRoute(path)
249 checkState(target === registration.instance)
250 routingTable.removeRoute(path)
251 checkState(paths.remove(context, path));
254 protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
256 val router = registration.router;
257 val paths = registration.registeredPaths;
259 for (ctxMap : registration.registeredPaths.entries) {
260 val context = ctxMap.key
261 val routingTable = router.getRoutingTable(context)
262 val path = ctxMap.value
263 routingTable.removeRoute(path)
267 protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
269 val type = registration.serviceType;
271 val proxy = managedProxies.get(type);
272 if(proxy.proxy.delegate === registration.instance) {
273 proxy.proxy.delegate = null;
277 def createDelegate(Class<? extends RpcService> type) {
278 getManagedDirectProxy(type);
281 def getRpcRouters() {
282 return Collections.unmodifiableMap(rpcRouters);
286 dataConsumerRegistration.unregister()
287 dataProviderRegistration.unregister()
288 notifyConsumerRegistration.unregister()
289 notifyProviderRegistration.unregister()
294 class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
297 private val BindingAwareBrokerImpl broker;
300 private val RpcRouter<T> router;
303 private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
305 private var closed = false;
307 new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
309 _router = backingRouter;
313 override protected removeRegistration() {
315 broker.unregisterRoutedRpcService(this)
318 override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
319 registerPath(context, instance);
322 override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
323 unregisterPath(context, instance);
326 override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
328 broker.registerPath(this, context, path);
331 override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
333 broker.unregisterPath(this, context, path);
336 override getServiceType() {
337 return router.serviceType;
340 private def checkClosed() {
342 throw new IllegalStateException("Registration was closed.");
346 class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
348 val ServiceRegistration<T> osgiRegistration;
349 private var BindingAwareBrokerImpl broker;
352 val Class<T> serviceType;
354 public new(Class<T> type, T service, ServiceRegistration<T> osgiReg,BindingAwareBrokerImpl broker) {
356 this._serviceType = type;
357 this.osgiRegistration = osgiReg;
361 override protected removeRegistration() {
362 broker.unregisterRpcService(this);