Merge "Leafref and identityref types to Json"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / BindingAwareBrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl
9
10 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer
11 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider
12 import org.opendaylight.yangtools.yang.binding.RpcService
13 import javassist.ClassPool
14 import org.osgi.framework.BundleContext
15 import java.util.Map
16 import java.util.HashMap
17 import javassist.LoaderClassPath
18 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker
19 import java.util.Hashtable
20 import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*
21
22 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
23 import org.osgi.framework.ServiceRegistration
24 import static org.opendaylight.controller.sal.binding.impl.osgi.Constants.*
25 import static extension org.opendaylight.controller.sal.binding.impl.osgi.PropertiesUtils.*
26 import org.opendaylight.controller.sal.binding.api.NotificationService
27 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext
28
29 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
34 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
35 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
36 import org.opendaylight.controller.sal.binding.spi.RpcRouter
37 import java.util.concurrent.ConcurrentHashMap
38 import static com.google.common.base.Preconditions.*
39 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
40 import org.opendaylight.yangtools.yang.binding.BaseIdentity
41 import com.google.common.collect.Multimap
42 import com.google.common.collect.HashMultimap
43 import static org.opendaylight.controller.sal.binding.impl.util.ClassLoaderUtils.*
44 import java.util.concurrent.Executors
45 import java.util.Collections
46 import org.opendaylight.yangtools.yang.binding.DataObject
47 import java.util.concurrent.locks.ReentrantLock
48 import java.util.concurrent.Callable
49 import java.util.WeakHashMap
50 import javax.annotation.concurrent.GuardedBy
51 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry
52
53 class BindingAwareBrokerImpl implements BindingAwareBroker, RpcProviderRegistry, AutoCloseable {
54     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
55
56     private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
57
58     private static val clsPool = ClassPool.getDefault()
59     public static var RuntimeCodeGenerator generator;
60
61     /**
62      * Map of all Managed Direct Proxies
63      * 
64      */
65     private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
66
67     /**
68      * 
69      * Map of all available Rpc Routers
70      * 
71      * 
72      */
73     private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new WeakHashMap();
74
75     @Property
76     private var NotificationProviderService notifyBroker
77
78     @Property
79     private var DataProviderService dataBroker
80
81     @Property
82     var BundleContext brokerBundleContext
83
84     ServiceRegistration<NotificationProviderService> notifyProviderRegistration
85
86     ServiceRegistration<NotificationService> notifyConsumerRegistration
87
88     ServiceRegistration<DataProviderService> dataProviderRegistration
89
90     ServiceRegistration<DataBrokerService> dataConsumerRegistration
91
92     private val proxyGenerationLock = new ReentrantLock;
93
94     private val routerGenerationLock = new ReentrantLock;
95
96     public new(BundleContext bundleContext) {
97         _brokerBundleContext = bundleContext;
98     }
99
100     def start() {
101         log.info("Starting MD-SAL: Binding Aware Broker");
102         initGenerator();
103
104         val executor = Executors.newCachedThreadPool;
105
106         // Initialization of notificationBroker
107         log.info("Starting MD-SAL: Binding Aware Notification Broker");
108
109         log.info("Starting MD-SAL: Binding Aware Data Broker");
110
111         log.info("Starting MD-SAL: Binding Aware Data Broker");
112         log.info("MD-SAL: Binding Aware Broker Started");
113     }
114
115     def initGenerator() {
116
117         // YANG Binding Class Loader
118         clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
119         generator = new RuntimeCodeGenerator(clsPool);
120     }
121
122     override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) {
123         val ctx = consumer.createContext(bundleCtx)
124         consumer.onSessionInitialized(ctx)
125         return ctx
126     }
127
128     override registerProvider(BindingAwareProvider provider, BundleContext bundleCtx) {
129         val ctx = provider.createContext(bundleCtx)
130         provider.onSessionInitialized(ctx)
131         provider.onSessionInitiated(ctx as ProviderContext)
132         return ctx
133     }
134
135     private def createContext(BindingAwareConsumer consumer, BundleContext consumerCtx) {
136         new OsgiConsumerContext(consumerCtx, this)
137     }
138
139     private def createContext(BindingAwareProvider provider, BundleContext providerCtx) {
140         new OsgiProviderContext(providerCtx, this)
141     }
142
143     /**
144      * Returns a Managed Direct Proxy for supplied class
145      * 
146      * Managed direct proxy is a generated proxy class conforming to the supplied interface
147      * which delegates all calls to the backing delegate.
148      * 
149      * Proxy does not do any validation, null pointer checks or modifies data in any way, it
150      * is only use to avoid exposing direct references to backing implementation of service.
151      * 
152      * If proxy class does not exist for supplied service class it will be generated automatically.
153      */
154     private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
155         var RpcProxyContext existing = null
156
157         if ((existing = managedProxies.get(service)) != null) {
158             return existing.proxy
159         }
160         return withLock(proxyGenerationLock) [ |
161             val maybeProxy = managedProxies.get(service);
162             if (maybeProxy !== null) {
163                 return maybeProxy.proxy;
164             }
165             
166             
167             val proxyInstance = generator.getDirectProxyFor(service)
168             val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
169             val properties = new Hashtable<String, String>()
170             rpcProxyCtx.proxy = proxyInstance as RpcService
171             properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
172             rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
173             managedProxies.put(service, rpcProxyCtx)
174             return rpcProxyCtx.proxy
175         ]
176     }
177
178     private static def <T> T withLock(ReentrantLock lock, Callable<T> method) {
179         try {
180             lock.lock();
181             val ret = method.call;
182             return ret;
183         } finally {
184             lock.unlock();
185         }
186     }
187
188     /**
189      * Registers RPC Implementation
190      * 
191      */
192     override <T extends RpcService> addRpcImplementation(Class<T> type, T service) {
193         checkNotNull(type, "Service type should not be null")
194         checkNotNull(service, "Service type should not be null")
195         
196         val proxy = getManagedDirectProxy(type)
197         checkState(proxy.delegate === null, "The Service for type %s is already registered", type)
198
199         proxy.delegate = service;
200         return new RpcServiceRegistrationImpl<T>(type, service, this);
201     }
202
203     override <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type, T service) {
204         checkNotNull(type, "Service type should not be null")
205         checkNotNull(service, "Service type should not be null")
206         
207         val router = resolveRpcRouter(type);
208         checkState(router !== null)
209         return new RoutedRpcRegistrationImpl<T>(service, router, this)
210     }
211     
212     override <T extends RpcService> getRpcService(Class<T> service) {
213         checkNotNull(service, "Service should not be null");
214         return getManagedDirectProxy(service) as T;
215     }
216
217     private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
218
219         val router = rpcRouters.get(type);
220         if (router !== null) {
221             return router as RpcRouter<T>;
222         }
223
224         // We created Router
225         return withLock(routerGenerationLock) [ |
226             val maybeRouter = rpcRouters.get(type);
227             if (maybeRouter !== null) {
228                 return maybeRouter as RpcRouter<T>;
229             }
230             
231             val newRouter = generator.getRouterFor(type);
232             checkState(newRouter !== null);
233             rpcRouters.put(type, newRouter);
234             // We create / update Direct Proxy for router
235             val proxy = getManagedDirectProxy(type);
236             proxy.delegate = newRouter.invocationProxy
237             return newRouter;
238         ]
239
240     }
241
242     protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
243         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
244
245         val router = registration.router;
246         val paths = registration.registeredPaths;
247
248         val routingTable = router.getRoutingTable(context)
249         checkState(routingTable != null);
250
251         // Updating internal structure of registration
252         routingTable.updateRoute(path, registration.instance)
253
254         // Update routing table / send announce to message bus
255         val success = paths.put(context, path);
256     }
257
258     protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
259         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
260
261         val router = registration.router;
262         val paths = registration.registeredPaths;
263
264         val routingTable = router.getRoutingTable(context)
265         checkState(routingTable != null);
266
267         // Updating internal structure of registration
268         val target = routingTable.getRoute(path)
269         checkState(target === registration.instance)
270         routingTable.removeRoute(path)
271         checkState(paths.remove(context, path));
272     }
273
274     protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
275
276         val router = registration.router;
277         val paths = registration.registeredPaths;
278
279         for (ctxMap : registration.registeredPaths.entries) {
280             val context = ctxMap.key
281             val routingTable = router.getRoutingTable(context)
282             val path = ctxMap.value
283             routingTable.removeRoute(path)
284         }
285     }
286
287     protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
288
289         val type = registration.serviceType;
290
291         val proxy = managedProxies.get(type);
292         if (proxy.proxy.delegate === registration.instance) {
293             proxy.proxy.delegate = null;
294         }
295     }
296
297     def createDelegate(Class<? extends RpcService> type) {
298         getManagedDirectProxy(type);
299     }
300
301     def getRpcRouters() {
302         return Collections.unmodifiableMap(rpcRouters);
303     }
304
305     override close() {
306         dataConsumerRegistration.unregister()
307         dataProviderRegistration.unregister()
308         notifyConsumerRegistration.unregister()
309         notifyProviderRegistration.unregister()
310     }
311
312 }
313
314 class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
315
316     @Property
317     private val BindingAwareBrokerImpl broker;
318
319     @Property
320     private val RpcRouter<T> router;
321
322     @Property
323     private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
324
325     private var closed = false;
326
327     new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
328         super(instance)
329         _router = backingRouter;
330         _broker = broker;
331     }
332
333     override protected removeRegistration() {
334         closed = true
335         broker.unregisterRoutedRpcService(this)
336     }
337
338     override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
339         registerPath(context, instance);
340     }
341
342     override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
343         unregisterPath(context, instance);
344     }
345
346     override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
347         checkClosed()
348         broker.registerPath(this, context, path);
349     }
350
351     override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
352         checkClosed()
353         broker.unregisterPath(this, context, path);
354     }
355
356     override getServiceType() {
357         return router.serviceType;
358     }
359
360     private def checkClosed() {
361         if (closed)
362             throw new IllegalStateException("Registration was closed.");
363     }
364
365 }
366
367 class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T> {
368
369     private var BindingAwareBrokerImpl broker;
370
371     @Property
372     val Class<T> serviceType;
373
374     public new(Class<T> type, T service, BindingAwareBrokerImpl broker) {
375         super(service);
376         this._serviceType = type;
377         this.broker = broker;
378     }
379
380     override protected removeRegistration() {
381         broker.unregisterRpcService(this);
382         broker = null;
383     }
384
385 }