Merge "Bug 164"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / BindingAwareBrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl
9
10 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer
11 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider
12 import org.opendaylight.yangtools.yang.binding.RpcService
13 import javassist.ClassPool
14 import org.osgi.framework.BundleContext
15 import java.util.Map
16 import java.util.HashMap
17 import javassist.LoaderClassPath
18 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker
19 import java.util.Hashtable
20 import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*
21
22 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
23 import org.osgi.framework.ServiceRegistration
24 import static org.opendaylight.controller.sal.binding.impl.osgi.Constants.*
25 import static extension org.opendaylight.controller.sal.binding.impl.osgi.PropertiesUtils.*
26 import org.opendaylight.controller.sal.binding.api.NotificationService
27 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext
28
29 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
34 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
35 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
36 import org.opendaylight.controller.sal.binding.spi.RpcRouter
37 import java.util.concurrent.ConcurrentHashMap
38 import static com.google.common.base.Preconditions.*
39 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
40 import org.opendaylight.yangtools.yang.binding.BaseIdentity
41 import com.google.common.collect.Multimap
42 import com.google.common.collect.HashMultimap
43 import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.*
44 import java.util.concurrent.Executors
45 import java.util.Collections
46 import org.opendaylight.yangtools.yang.binding.DataObject
47 import java.util.concurrent.locks.ReentrantLock
48 import java.util.concurrent.Callable
49 import java.util.WeakHashMap
50 import javax.annotation.concurrent.GuardedBy
51
52 class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable {
53     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
54
55     private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
56
57     private static val clsPool = ClassPool.getDefault()
58     public static var RuntimeCodeGenerator generator;
59
60     /**
61      * Map of all Managed Direct Proxies
62      * 
63      */
64     private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
65
66     /**
67      * 
68      * Map of all available Rpc Routers
69      * 
70      * 
71      */
72     private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new WeakHashMap();
73
74     @Property
75     private var NotificationProviderService notifyBroker
76
77     @Property
78     private var DataProviderService dataBroker
79
80     @Property
81     var BundleContext brokerBundleContext
82
83     ServiceRegistration<NotificationProviderService> notifyProviderRegistration
84
85     ServiceRegistration<NotificationService> notifyConsumerRegistration
86
87     ServiceRegistration<DataProviderService> dataProviderRegistration
88
89     ServiceRegistration<DataBrokerService> dataConsumerRegistration
90
91     private val proxyGenerationLock = new ReentrantLock;
92
93     private val routerGenerationLock = new ReentrantLock;
94
95     public new(BundleContext bundleContext) {
96         _brokerBundleContext = bundleContext;
97     }
98
99     def start() {
100         log.info("Starting MD-SAL: Binding Aware Broker");
101         initGenerator();
102
103         val executor = Executors.newCachedThreadPool;
104
105         // Initialization of notificationBroker
106         log.info("Starting MD-SAL: Binding Aware Notification Broker");
107
108         log.info("Starting MD-SAL: Binding Aware Data Broker");
109
110         log.info("Starting MD-SAL: Binding Aware Data Broker");
111         log.info("MD-SAL: Binding Aware Broker Started");
112     }
113
114     def initGenerator() {
115
116         // YANG Binding Class Loader
117         clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
118         generator = new RuntimeCodeGenerator(clsPool);
119     }
120
121     override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) {
122         val ctx = consumer.createContext(bundleCtx)
123         consumer.onSessionInitialized(ctx)
124         return ctx
125     }
126
127     override registerProvider(BindingAwareProvider provider, BundleContext bundleCtx) {
128         val ctx = provider.createContext(bundleCtx)
129         provider.onSessionInitialized(ctx)
130         provider.onSessionInitiated(ctx as ProviderContext)
131         return ctx
132     }
133
134     private def createContext(BindingAwareConsumer consumer, BundleContext consumerCtx) {
135         new OsgiConsumerContext(consumerCtx, this)
136     }
137
138     private def createContext(BindingAwareProvider provider, BundleContext providerCtx) {
139         new OsgiProviderContext(providerCtx, this)
140     }
141
142     /**
143      * Returns a Managed Direct Proxy for supplied class
144      * 
145      * Managed direct proxy is a generated proxy class conforming to the supplied interface
146      * which delegates all calls to the backing delegate.
147      * 
148      * Proxy does not do any validation, null pointer checks or modifies data in any way, it
149      * is only use to avoid exposing direct references to backing implementation of service.
150      * 
151      * If proxy class does not exist for supplied service class it will be generated automatically.
152      */
153     private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
154         var RpcProxyContext existing = null
155
156         if ((existing = managedProxies.get(service)) != null) {
157             return existing.proxy
158         }
159         return withLock(proxyGenerationLock) [ |
160             val maybeProxy = managedProxies.get(service);
161             if (maybeProxy !== null) {
162                 return maybeProxy.proxy;
163             }
164             
165             
166             val proxyInstance = generator.getDirectProxyFor(service)
167             val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
168             val properties = new Hashtable<String, String>()
169             rpcProxyCtx.proxy = proxyInstance as RpcService
170             properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
171             rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
172             managedProxies.put(service, rpcProxyCtx)
173             return rpcProxyCtx.proxy
174         ]
175     }
176
177     private static def <T> T withLock(ReentrantLock lock, Callable<T> method) {
178         try {
179             lock.lock();
180             val ret = method.call;
181             return ret;
182         } finally {
183             lock.unlock();
184         }
185     }
186
187     /**
188      * Registers RPC Implementation
189      * 
190      */
191     def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
192         Hashtable<String, String> properties) {
193         val proxy = getManagedDirectProxy(type)
194         checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
195
196         val osgiReg = context.bundleContext.registerService(type, service, properties);
197         proxy.delegate = service;
198         return new RpcServiceRegistrationImpl<T>(type, service, osgiReg, this);
199     }
200
201     def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
202         OsgiProviderContext context) {
203         val router = resolveRpcRouter(type);
204         checkState(router !== null)
205         return new RoutedRpcRegistrationImpl<T>(service, router, this)
206     }
207
208     private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
209
210         val router = rpcRouters.get(type);
211         if (router !== null) {
212             return router as RpcRouter<T>;
213         }
214
215         // We created Router
216         return withLock(routerGenerationLock) [ |
217             val maybeRouter = rpcRouters.get(type);
218             if (maybeRouter !== null) {
219                 return maybeRouter as RpcRouter<T>;
220             }
221             
222             val newRouter = generator.getRouterFor(type);
223             checkState(newRouter !== null);
224             rpcRouters.put(type, newRouter);
225             // We create / update Direct Proxy for router
226             val proxy = getManagedDirectProxy(type);
227             proxy.delegate = newRouter.invocationProxy
228             return newRouter;
229         ]
230
231     }
232
233     protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
234         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
235
236         val router = registration.router;
237         val paths = registration.registeredPaths;
238
239         val routingTable = router.getRoutingTable(context)
240         checkState(routingTable != null);
241
242         // Updating internal structure of registration
243         routingTable.updateRoute(path, registration.instance)
244
245         // Update routing table / send announce to message bus
246         val success = paths.put(context, path);
247     }
248
249     protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
250         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
251
252         val router = registration.router;
253         val paths = registration.registeredPaths;
254
255         val routingTable = router.getRoutingTable(context)
256         checkState(routingTable != null);
257
258         // Updating internal structure of registration
259         val target = routingTable.getRoute(path)
260         checkState(target === registration.instance)
261         routingTable.removeRoute(path)
262         checkState(paths.remove(context, path));
263     }
264
265     protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
266
267         val router = registration.router;
268         val paths = registration.registeredPaths;
269
270         for (ctxMap : registration.registeredPaths.entries) {
271             val context = ctxMap.key
272             val routingTable = router.getRoutingTable(context)
273             val path = ctxMap.value
274             routingTable.removeRoute(path)
275         }
276     }
277
278     protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
279
280         val type = registration.serviceType;
281
282         val proxy = managedProxies.get(type);
283         if (proxy.proxy.delegate === registration.instance) {
284             proxy.proxy.delegate = null;
285         }
286     }
287
288     def createDelegate(Class<? extends RpcService> type) {
289         getManagedDirectProxy(type);
290     }
291
292     def getRpcRouters() {
293         return Collections.unmodifiableMap(rpcRouters);
294     }
295
296     override close() {
297         dataConsumerRegistration.unregister()
298         dataProviderRegistration.unregister()
299         notifyConsumerRegistration.unregister()
300         notifyProviderRegistration.unregister()
301     }
302
303 }
304
305 class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
306
307     @Property
308     private val BindingAwareBrokerImpl broker;
309
310     @Property
311     private val RpcRouter<T> router;
312
313     @Property
314     private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
315
316     private var closed = false;
317
318     new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
319         super(instance)
320         _router = backingRouter;
321         _broker = broker;
322     }
323
324     override protected removeRegistration() {
325         closed = true
326         broker.unregisterRoutedRpcService(this)
327     }
328
329     override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
330         registerPath(context, instance);
331     }
332
333     override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
334         unregisterPath(context, instance);
335     }
336
337     override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
338         checkClosed()
339         broker.registerPath(this, context, path);
340     }
341
342     override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
343         checkClosed()
344         broker.unregisterPath(this, context, path);
345     }
346
347     override getServiceType() {
348         return router.serviceType;
349     }
350
351     private def checkClosed() {
352         if (closed)
353             throw new IllegalStateException("Registration was closed.");
354     }
355
356 }
357
358 class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
359
360     val ServiceRegistration<T> osgiRegistration;
361     private var BindingAwareBrokerImpl broker;
362
363     @Property
364     val Class<T> serviceType;
365
366     public new(Class<T> type, T service, ServiceRegistration<T> osgiReg, BindingAwareBrokerImpl broker) {
367         super(service);
368         this._serviceType = type;
369         this.osgiRegistration = osgiReg;
370         this.broker = broker;
371     }
372
373     override protected removeRegistration() {
374         broker.unregisterRpcService(this);
375         broker = null;
376     }
377
378 }