Updated implementation of broker (data services, generated code), added Integration...
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / BindingAwareBrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl
9
10 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer
11 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider
12 import org.opendaylight.yangtools.yang.binding.RpcService
13 import javassist.ClassPool
14 import org.osgi.framework.BundleContext
15 import java.util.Map
16 import java.util.HashMap
17 import javassist.LoaderClassPath
18 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker
19 import java.util.Hashtable
20 import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*
21
22 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
23 import org.osgi.framework.ServiceRegistration
24 import static org.opendaylight.controller.sal.binding.impl.osgi.Constants.*
25 import static extension org.opendaylight.controller.sal.binding.impl.osgi.PropertiesUtils.*
26 import org.opendaylight.controller.sal.binding.api.NotificationService
27 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext
28
29 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
34 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
35 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
36 import org.opendaylight.controller.sal.binding.spi.RpcRouter
37 import java.util.concurrent.ConcurrentHashMap
38 import static com.google.common.base.Preconditions.*
39 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
40 import org.opendaylight.yangtools.yang.binding.BaseIdentity
41 import com.google.common.collect.Multimap
42 import com.google.common.collect.HashMultimap
43 import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.*
44 import java.util.concurrent.Executors
45
46 class BindingAwareBrokerImpl implements BindingAwareBroker {
47     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
48
49     private val clsPool = ClassPool.getDefault()
50     private var RuntimeCodeGenerator generator;
51     
52
53     /**
54      * Map of all Managed Direct Proxies
55      * 
56      */
57     private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
58
59     /**
60      * 
61      * Map of all available Rpc Routers
62      * 
63      * 
64      */
65     private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new ConcurrentHashMap();
66
67     @Property
68     private var NotificationBrokerImpl notifyBroker
69     
70     @Property
71     private var DataBrokerImpl dataBroker
72     
73     private var ServiceRegistration<NotificationProviderService> notifyBrokerRegistration
74
75     @Property
76     var BundleContext brokerBundleContext
77
78     def start() {
79         initGenerator();
80
81         val executor = Executors.newCachedThreadPool;
82         // Initialization of notificationBroker
83         notifyBroker = new NotificationBrokerImpl(executor);
84         notifyBroker.invokerFactory = generator.invokerFactory;
85         dataBroker = new DataBrokerImpl();
86         dataBroker.executor = executor;
87         val brokerProperties = newProperties();
88         notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
89             brokerProperties)
90         brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
91         brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
92         brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
93         
94         
95
96     }
97
98     def initGenerator() {
99
100         // YANG Binding Class Loader
101         clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
102         generator = new RuntimeCodeGenerator(clsPool);
103     }
104
105     override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) {
106         val ctx = consumer.createContext(bundleCtx)
107         consumer.onSessionInitialized(ctx)
108         return ctx
109     }
110
111     override registerProvider(BindingAwareProvider provider, BundleContext bundleCtx) {
112         val ctx = provider.createContext(bundleCtx)
113         provider.onSessionInitialized(ctx)
114         provider.onSessionInitiated(ctx as ProviderContext)
115         return ctx
116     }
117
118     private def createContext(BindingAwareConsumer consumer, BundleContext consumerCtx) {
119         new OsgiConsumerContext(consumerCtx, this)
120     }
121
122     private def createContext(BindingAwareProvider provider, BundleContext providerCtx) {
123         new OsgiProviderContext(providerCtx, this)
124     }
125
126     /**
127      * Returns a Managed Direct Proxy for supplied class
128      * 
129      * Managed direct proxy is a generated proxy class conforming to the supplied interface
130      * which delegates all calls to the backing delegate.
131      * 
132      * Proxy does not do any validation, null pointer checks or modifies data in any way, it
133      * is only use to avoid exposing direct references to backing implementation of service.
134      * 
135      * If proxy class does not exist for supplied service class it will be generated automatically.
136      */
137     private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
138
139         var RpcProxyContext existing = null
140         if ((existing = managedProxies.get(service)) != null) {
141             return existing.proxy
142         }
143         val proxyInstance = generator.getDirectProxyFor(service)
144         val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
145         val properties = new Hashtable<String, String>()
146         rpcProxyCtx.proxy = proxyInstance as RpcService
147
148         properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
149         rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
150         managedProxies.put(service, rpcProxyCtx)
151         return rpcProxyCtx.proxy
152     }
153
154     /**
155      * Registers RPC Implementation
156      * 
157      */
158     def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
159         Hashtable<String, String> properties) {
160         val proxy = getManagedDirectProxy(type)
161         checkState(proxy.delegate === null, "The Service for type {} is already registered", type)
162
163         val osgiReg = context.bundleContext.registerService(type, service, properties);
164         proxy.delegate = service;
165         return new RpcServiceRegistrationImpl<T>(type, service, osgiReg);
166     }
167
168     def <T extends RpcService> RpcRegistration<T> registerMountedRpcImplementation(Class<T> type, T service,
169         InstanceIdentifier<?> identifier, OsgiProviderContext context) {
170         throw new UnsupportedOperationException("TODO: auto-generated method stub")
171     }
172
173     def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
174         OsgiProviderContext context) {
175         val router = resolveRpcRouter(type);
176         checkState(router !== null)
177         return new RoutedRpcRegistrationImpl<T>(service, router, this)
178     }
179
180     private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
181
182         val router = rpcRouters.get(type);
183         if (router !== null) {
184             return router as RpcRouter<T>;
185         }
186
187         // We created Router
188         val newRouter = generator.getRouterFor(type);
189         checkState(newRouter !== null);
190         rpcRouters.put(type, newRouter);
191
192         // We create / update Direct Proxy for router
193         val proxy = getManagedDirectProxy(type);
194         proxy.delegate = newRouter.invocationProxy
195         return newRouter;
196
197     }
198
199     protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
200         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
201
202         val router = registration.router;
203         val paths = registration.registeredPaths;
204
205         val routingTable = router.getRoutingTable(context)
206         checkState(routingTable != null);
207
208         // Updating internal structure of registration
209         routingTable.updateRoute(path, registration.instance)
210         // Update routing table / send announce to message bus
211         
212         val success = paths.put(context, path);
213     }
214
215     protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
216         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
217
218         val router = registration.router;
219         val paths = registration.registeredPaths;
220
221         val routingTable = router.getRoutingTable(context)
222         checkState(routingTable != null);
223
224         // Updating internal structure of registration
225         val target = routingTable.getRoute(path)
226         checkState(target === registration.instance)
227         routingTable.removeRoute(path)
228         checkState(paths.remove(context, path));
229     }
230
231     protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
232
233         val router = registration.router;
234         val paths = registration.registeredPaths;
235
236         for (ctxMap : registration.registeredPaths.entries) {
237             val context = ctxMap.key
238             val routingTable = router.getRoutingTable(context)
239             val path = ctxMap.value
240             routingTable.removeRoute(path)
241
242         }
243     }
244     
245     def createDelegate(Class<? extends RpcService> type) {
246         getManagedDirectProxy(type);
247     }
248     
249 }
250
251 class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
252
253     @Property
254     private val BindingAwareBrokerImpl broker;
255
256     @Property
257     private val RpcRouter<T> router;
258
259     @Property
260     private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
261
262     private var closed = false;
263
264     new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
265         super(instance)
266         _router = backingRouter;
267         _broker = broker;
268     }
269
270     override protected removeRegistration() {
271         closed = true
272         broker.unregisterRoutedRpcService(this)
273     }
274
275     override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
276         registerPath(context, instance);
277     }
278
279     override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
280         unregisterPath(context, instance);
281     }
282
283     override getService() {
284         return instance;
285     }
286
287     override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
288         checkClosed()
289         broker.registerPath(this, context, path);
290     }
291
292     override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
293         checkClosed()
294         broker.unregisterPath(this, context, path);
295     }
296
297     private def checkClosed() {
298         if (closed)
299             throw new IllegalStateException("Registration was closed.");
300     }
301
302 }