2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.binding.impl
10 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer
11 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider
12 import org.opendaylight.yangtools.yang.binding.RpcService
13 import javassist.ClassPool
14 import org.osgi.framework.BundleContext
16 import java.util.HashMap
17 import javassist.LoaderClassPath
18 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker
19 import java.util.Hashtable
20 import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*
22 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
23 import org.osgi.framework.ServiceRegistration
24 import static org.opendaylight.controller.sal.binding.impl.osgi.Constants.*
25 import static extension org.opendaylight.controller.sal.binding.impl.osgi.PropertiesUtils.*
26 import org.opendaylight.controller.sal.binding.api.NotificationService
27 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext
29 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
34 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
35 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
36 import org.opendaylight.controller.sal.binding.spi.RpcRouter
37 import java.util.concurrent.ConcurrentHashMap
38 import static com.google.common.base.Preconditions.*
39 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
40 import org.opendaylight.yangtools.yang.binding.BaseIdentity
41 import com.google.common.collect.Multimap
42 import com.google.common.collect.HashMultimap
43 import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.*
44 import java.util.concurrent.Executors
45 import java.util.Collections
46 import org.opendaylight.yangtools.yang.binding.DataObject
47 import java.util.concurrent.locks.ReentrantLock
48 import java.util.concurrent.Callable
49 import java.util.WeakHashMap
50 import javax.annotation.concurrent.GuardedBy
51 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry
53 class BindingAwareBrokerImpl implements BindingAwareBroker, RpcProviderRegistry, AutoCloseable {
54 private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
56 private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
58 private static val clsPool = ClassPool.getDefault()
59 public static var RuntimeCodeGenerator generator;
62 * Map of all Managed Direct Proxies
65 private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
69 * Map of all available Rpc Routers
73 private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new WeakHashMap();
76 private var NotificationProviderService notifyBroker
79 private var DataProviderService dataBroker
82 var BundleContext brokerBundleContext
84 ServiceRegistration<NotificationProviderService> notifyProviderRegistration
86 ServiceRegistration<NotificationService> notifyConsumerRegistration
88 ServiceRegistration<DataProviderService> dataProviderRegistration
90 ServiceRegistration<DataBrokerService> dataConsumerRegistration
92 private val proxyGenerationLock = new ReentrantLock;
94 private val routerGenerationLock = new ReentrantLock;
96 public new(BundleContext bundleContext) {
97 _brokerBundleContext = bundleContext;
101 log.info("Starting MD-SAL: Binding Aware Broker");
104 val executor = Executors.newCachedThreadPool;
106 // Initialization of notificationBroker
107 log.info("Starting MD-SAL: Binding Aware Notification Broker");
109 log.info("Starting MD-SAL: Binding Aware Data Broker");
111 log.info("Starting MD-SAL: Binding Aware Data Broker");
112 log.info("MD-SAL: Binding Aware Broker Started");
115 def initGenerator() {
117 // YANG Binding Class Loader
118 clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
119 generator = new RuntimeCodeGenerator(clsPool);
122 override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) {
123 val ctx = consumer.createContext(bundleCtx)
124 consumer.onSessionInitialized(ctx)
128 override registerProvider(BindingAwareProvider provider, BundleContext bundleCtx) {
129 val ctx = provider.createContext(bundleCtx)
130 provider.onSessionInitialized(ctx)
131 provider.onSessionInitiated(ctx as ProviderContext)
135 private def createContext(BindingAwareConsumer consumer, BundleContext consumerCtx) {
136 new OsgiConsumerContext(consumerCtx, this)
139 private def createContext(BindingAwareProvider provider, BundleContext providerCtx) {
140 new OsgiProviderContext(providerCtx, this)
144 * Returns a Managed Direct Proxy for supplied class
146 * Managed direct proxy is a generated proxy class conforming to the supplied interface
147 * which delegates all calls to the backing delegate.
149 * Proxy does not do any validation, null pointer checks or modifies data in any way, it
150 * is only use to avoid exposing direct references to backing implementation of service.
152 * If proxy class does not exist for supplied service class it will be generated automatically.
154 private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
155 var RpcProxyContext existing = null
157 if ((existing = managedProxies.get(service)) != null) {
158 return existing.proxy
160 return withLock(proxyGenerationLock) [ |
161 val maybeProxy = managedProxies.get(service);
162 if (maybeProxy !== null) {
163 return maybeProxy.proxy;
167 val proxyInstance = generator.getDirectProxyFor(service)
168 val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
169 val properties = new Hashtable<String, String>()
170 rpcProxyCtx.proxy = proxyInstance as RpcService
171 properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
172 rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
173 managedProxies.put(service, rpcProxyCtx)
174 return rpcProxyCtx.proxy
178 private static def <T> T withLock(ReentrantLock lock, Callable<T> method) {
181 val ret = method.call;
189 * Registers RPC Implementation
192 override <T extends RpcService> addRpcImplementation(Class<T> type, T service) {
193 checkNotNull(type, "Service type should not be null")
194 checkNotNull(service, "Service type should not be null")
196 val proxy = getManagedDirectProxy(type)
197 checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
199 proxy.delegate = service;
200 return new RpcServiceRegistrationImpl<T>(type, service, this);
203 override <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type, T service) {
204 checkNotNull(type, "Service type should not be null")
205 checkNotNull(service, "Service type should not be null")
207 val router = resolveRpcRouter(type);
208 checkState(router !== null)
209 return new RoutedRpcRegistrationImpl<T>(service, router, this)
212 override <T extends RpcService> getRpcService(Class<T> service) {
213 checkNotNull(service, "Service should not be null");
214 return getManagedDirectProxy(service) as T;
217 private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
219 val router = rpcRouters.get(type);
220 if (router !== null) {
221 return router as RpcRouter<T>;
225 return withLock(routerGenerationLock) [ |
226 val maybeRouter = rpcRouters.get(type);
227 if (maybeRouter !== null) {
228 return maybeRouter as RpcRouter<T>;
231 val newRouter = generator.getRouterFor(type);
232 checkState(newRouter !== null);
233 rpcRouters.put(type, newRouter);
234 // We create / update Direct Proxy for router
235 val proxy = getManagedDirectProxy(type);
236 proxy.delegate = newRouter.invocationProxy
242 protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
243 Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
245 val router = registration.router;
246 val paths = registration.registeredPaths;
248 val routingTable = router.getRoutingTable(context)
249 checkState(routingTable != null);
251 // Updating internal structure of registration
252 routingTable.updateRoute(path, registration.instance)
254 // Update routing table / send announce to message bus
255 val success = paths.put(context, path);
258 protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
259 Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
261 val router = registration.router;
262 val paths = registration.registeredPaths;
264 val routingTable = router.getRoutingTable(context)
265 checkState(routingTable != null);
267 // Updating internal structure of registration
268 val target = routingTable.getRoute(path)
269 checkState(target === registration.instance)
270 routingTable.removeRoute(path)
271 checkState(paths.remove(context, path));
274 protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
276 val router = registration.router;
277 val paths = registration.registeredPaths;
279 for (ctxMap : registration.registeredPaths.entries) {
280 val context = ctxMap.key
281 val routingTable = router.getRoutingTable(context)
282 val path = ctxMap.value
283 routingTable.removeRoute(path)
287 protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
289 val type = registration.serviceType;
291 val proxy = managedProxies.get(type);
292 if (proxy.proxy.delegate === registration.instance) {
293 proxy.proxy.delegate = null;
297 def createDelegate(Class<? extends RpcService> type) {
298 getManagedDirectProxy(type);
301 def getRpcRouters() {
302 return Collections.unmodifiableMap(rpcRouters);
306 dataConsumerRegistration.unregister()
307 dataProviderRegistration.unregister()
308 notifyConsumerRegistration.unregister()
309 notifyProviderRegistration.unregister()
314 class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
317 private val BindingAwareBrokerImpl broker;
320 private val RpcRouter<T> router;
323 private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
325 private var closed = false;
327 new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
329 _router = backingRouter;
333 override protected removeRegistration() {
335 broker.unregisterRoutedRpcService(this)
338 override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
339 registerPath(context, instance);
342 override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
343 unregisterPath(context, instance);
346 override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
348 broker.registerPath(this, context, path);
351 override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
353 broker.unregisterPath(this, context, path);
356 override getServiceType() {
357 return router.serviceType;
360 private def checkClosed() {
362 throw new IllegalStateException("Registration was closed.");
367 class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
369 private var BindingAwareBrokerImpl broker;
372 val Class<T> serviceType;
374 public new(Class<T> type, T service, BindingAwareBrokerImpl broker) {
376 this._serviceType = type;
377 this.broker = broker;
380 override protected removeRegistration() {
381 broker.unregisterRpcService(this);