Base integration of Binding Aware Broker with Config Subsystem
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / BindingAwareBrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl
9
10 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer
11 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider
12 import org.opendaylight.yangtools.yang.binding.RpcService
13 import javassist.ClassPool
14 import org.osgi.framework.BundleContext
15 import java.util.Map
16 import java.util.HashMap
17 import javassist.LoaderClassPath
18 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker
19 import java.util.Hashtable
20 import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*
21
22 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
23 import org.osgi.framework.ServiceRegistration
24 import static org.opendaylight.controller.sal.binding.impl.osgi.Constants.*
25 import static extension org.opendaylight.controller.sal.binding.impl.osgi.PropertiesUtils.*
26 import org.opendaylight.controller.sal.binding.api.NotificationService
27 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext
28
29 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
34 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
35 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
36 import org.opendaylight.controller.sal.binding.spi.RpcRouter
37 import java.util.concurrent.ConcurrentHashMap
38 import static com.google.common.base.Preconditions.*
39 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
40 import org.opendaylight.yangtools.yang.binding.BaseIdentity
41 import com.google.common.collect.Multimap
42 import com.google.common.collect.HashMultimap
43 import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.*
44 import java.util.concurrent.Executors
45 import java.util.Collections
46 import org.opendaylight.yangtools.yang.binding.DataObject
47
48 class BindingAwareBrokerImpl implements BindingAwareBroker, AutoCloseable {
49     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
50
51
52     private InstanceIdentifier<? extends DataObject> root = InstanceIdentifier.builder().toInstance();
53
54     private val clsPool = ClassPool.getDefault()
55     private var RuntimeCodeGenerator generator;
56     
57
58     /**
59      * Map of all Managed Direct Proxies
60      * 
61      */
62     private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
63
64     /**
65      * 
66      * Map of all available Rpc Routers
67      * 
68      * 
69      */
70     private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new ConcurrentHashMap();
71
72     @Property
73     private var NotificationBrokerImpl notifyBroker
74     
75     @Property
76     private var DataBrokerImpl dataBroker
77     
78     @Property
79     var BundleContext brokerBundleContext
80     
81     ServiceRegistration<NotificationProviderService> notifyProviderRegistration
82     
83     ServiceRegistration<NotificationService> notifyConsumerRegistration
84     
85     ServiceRegistration<DataProviderService> dataProviderRegistration
86     
87     ServiceRegistration<DataBrokerService> dataConsumerRegistration
88     
89     private HashMapDataStore store = new HashMapDataStore();
90     
91     public new(BundleContext bundleContext) {
92         _brokerBundleContext = bundleContext;
93     }
94
95     def start() {
96         log.info("Starting MD-SAL: Binding Aware Broker");
97         initGenerator();
98
99         val executor = Executors.newCachedThreadPool;
100         // Initialization of notificationBroker
101         log.info("Starting MD-SAL: Binding Aware Notification Broker");
102         notifyBroker = new NotificationBrokerImpl(executor);
103         notifyBroker.invokerFactory = generator.invokerFactory;
104
105         log.info("Starting MD-SAL: Binding Aware Data Broker");
106         dataBroker = new DataBrokerImpl();
107         dataBroker.executor = executor;
108
109         val brokerProperties = newProperties();
110         
111         
112         log.info("Starting MD-SAL: Binding Aware Data Broker");
113         notifyProviderRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
114             brokerProperties)
115         notifyConsumerRegistration = brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
116         dataProviderRegistration = brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
117         dataConsumerRegistration = brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
118         
119         
120         getDataBroker().registerDataReader(root, store);
121         getDataBroker().registerCommitHandler(root, store)
122         
123         log.info("MD-SAL: Binding Aware Broker Started");
124     }
125
126     def initGenerator() {
127
128         // YANG Binding Class Loader
129         clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
130         generator = new RuntimeCodeGenerator(clsPool);
131     }
132
133     override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) {
134         val ctx = consumer.createContext(bundleCtx)
135         consumer.onSessionInitialized(ctx)
136         return ctx
137     }
138
139     override registerProvider(BindingAwareProvider provider, BundleContext bundleCtx) {
140         val ctx = provider.createContext(bundleCtx)
141         provider.onSessionInitialized(ctx)
142         provider.onSessionInitiated(ctx as ProviderContext)
143         return ctx
144     }
145
146     private def createContext(BindingAwareConsumer consumer, BundleContext consumerCtx) {
147         new OsgiConsumerContext(consumerCtx, this)
148     }
149
150     private def createContext(BindingAwareProvider provider, BundleContext providerCtx) {
151         new OsgiProviderContext(providerCtx, this)
152     }
153
154     /**
155      * Returns a Managed Direct Proxy for supplied class
156      * 
157      * Managed direct proxy is a generated proxy class conforming to the supplied interface
158      * which delegates all calls to the backing delegate.
159      * 
160      * Proxy does not do any validation, null pointer checks or modifies data in any way, it
161      * is only use to avoid exposing direct references to backing implementation of service.
162      * 
163      * If proxy class does not exist for supplied service class it will be generated automatically.
164      */
165     private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
166
167         var RpcProxyContext existing = null
168         if ((existing = managedProxies.get(service)) != null) {
169             return existing.proxy
170         }
171         val proxyInstance = generator.getDirectProxyFor(service)
172         val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
173         val properties = new Hashtable<String, String>()
174         rpcProxyCtx.proxy = proxyInstance as RpcService
175
176         properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
177         rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
178         managedProxies.put(service, rpcProxyCtx)
179         return rpcProxyCtx.proxy
180     }
181
182     /**
183      * Registers RPC Implementation
184      * 
185      */
186     def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
187         Hashtable<String, String> properties) {
188         val proxy = getManagedDirectProxy(type)
189         checkState(proxy.delegate === null, "The Service for type {} is already registered", type)
190
191         val osgiReg = context.bundleContext.registerService(type, service, properties);
192         proxy.delegate = service;
193         return new RpcServiceRegistrationImpl<T>(type, service, osgiReg,this);
194     }
195
196     def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
197         OsgiProviderContext context) {
198         val router = resolveRpcRouter(type);
199         checkState(router !== null)
200         return new RoutedRpcRegistrationImpl<T>(service, router, this)
201     }
202
203     private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
204
205         val router = rpcRouters.get(type);
206         if (router !== null) {
207             return router as RpcRouter<T>;
208         }
209
210         // We created Router
211         val newRouter = generator.getRouterFor(type);
212         checkState(newRouter !== null);
213         rpcRouters.put(type, newRouter);
214
215         // We create / update Direct Proxy for router
216         val proxy = getManagedDirectProxy(type);
217         proxy.delegate = newRouter.invocationProxy
218         return newRouter;
219
220     }
221
222     protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
223         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
224
225         val router = registration.router;
226         val paths = registration.registeredPaths;
227
228         val routingTable = router.getRoutingTable(context)
229         checkState(routingTable != null);
230
231         // Updating internal structure of registration
232         routingTable.updateRoute(path, registration.instance)
233         // Update routing table / send announce to message bus
234         
235         val success = paths.put(context, path);
236     }
237
238     protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
239         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
240
241         val router = registration.router;
242         val paths = registration.registeredPaths;
243
244         val routingTable = router.getRoutingTable(context)
245         checkState(routingTable != null);
246
247         // Updating internal structure of registration
248         val target = routingTable.getRoute(path)
249         checkState(target === registration.instance)
250         routingTable.removeRoute(path)
251         checkState(paths.remove(context, path));
252     }
253
254     protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
255
256         val router = registration.router;
257         val paths = registration.registeredPaths;
258
259         for (ctxMap : registration.registeredPaths.entries) {
260             val context = ctxMap.key
261             val routingTable = router.getRoutingTable(context)
262             val path = ctxMap.value
263             routingTable.removeRoute(path)
264         }
265     }
266     
267     protected def <T extends RpcService> void unregisterRpcService(RpcServiceRegistrationImpl<T> registration) {
268
269         val type = registration.serviceType;
270         
271         val proxy = managedProxies.get(type);
272         if(proxy.proxy.delegate === registration.instance) {
273             proxy.proxy.delegate = null;
274         }
275     }
276     
277     def createDelegate(Class<? extends RpcService> type) {
278         getManagedDirectProxy(type);
279     }
280     
281     def getRpcRouters() {
282         return Collections.unmodifiableMap(rpcRouters);
283     }
284     
285     override close() {
286         dataConsumerRegistration.unregister()
287         dataProviderRegistration.unregister()
288         notifyConsumerRegistration.unregister()
289         notifyProviderRegistration.unregister()
290     }
291     
292 }
293
294 class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
295
296     @Property
297     private val BindingAwareBrokerImpl broker;
298
299     @Property
300     private val RpcRouter<T> router;
301
302     @Property
303     private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
304
305     private var closed = false;
306
307     new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
308         super(instance)
309         _router = backingRouter;
310         _broker = broker;
311     }
312
313     override protected removeRegistration() {
314         closed = true
315         broker.unregisterRoutedRpcService(this)
316     }
317
318     override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
319         registerPath(context, instance);
320     }
321
322     override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
323         unregisterPath(context, instance);
324     }
325
326     override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
327         checkClosed()
328         broker.registerPath(this, context, path);
329     }
330
331     override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
332         checkClosed()
333         broker.unregisterPath(this, context, path);
334     }
335     
336     override getServiceType() {
337         return router.serviceType;
338     }
339
340     private def checkClosed() {
341         if (closed)
342             throw new IllegalStateException("Registration was closed.");
343     }
344
345 }
346 class RpcServiceRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RpcRegistration<T>  {
347
348     val ServiceRegistration<T> osgiRegistration;
349     private var BindingAwareBrokerImpl broker;
350     
351     @Property
352     val Class<T> serviceType;
353
354     public new(Class<T> type, T service, ServiceRegistration<T> osgiReg,BindingAwareBrokerImpl broker) {
355         super(service);
356         this._serviceType = type;
357         this.osgiRegistration = osgiReg;
358         this.broker= broker;
359     }
360
361     override protected removeRegistration() {
362         broker.unregisterRpcService(this);
363         broker = null;
364     }
365     
366 }