740ae887b0588bcc7648aaba49bb7f63eef5b53a
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / BindingAwareBrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl
9
10 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer
11 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider
12 import org.opendaylight.yangtools.yang.binding.RpcService
13 import javassist.ClassPool
14 import org.osgi.framework.BundleContext
15 import java.util.Map
16 import java.util.HashMap
17 import javassist.LoaderClassPath
18 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker
19 import java.util.Hashtable
20 import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*
21
22 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
23 import org.osgi.framework.ServiceRegistration
24 import static org.opendaylight.controller.sal.binding.impl.osgi.Constants.*
25 import static extension org.opendaylight.controller.sal.binding.impl.osgi.PropertiesUtils.*
26 import org.opendaylight.controller.sal.binding.api.NotificationService
27 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext
28
29 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
34 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
35 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
36 import org.opendaylight.controller.sal.binding.spi.RpcRouter
37 import java.util.concurrent.ConcurrentHashMap
38 import static com.google.common.base.Preconditions.*
39 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
40 import org.opendaylight.yangtools.yang.binding.BaseIdentity
41 import com.google.common.collect.Multimap
42 import com.google.common.collect.HashMultimap
43 import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.*
44 import java.util.concurrent.Executors
45
46 class BindingAwareBrokerImpl implements BindingAwareBroker {
47     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
48
49     private val clsPool = ClassPool.getDefault()
50     private var RuntimeCodeGenerator generator;
51
52     /**
53      * Map of all Managed Direct Proxies
54      * 
55      */
56     private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
57
58     /**
59      * 
60      * Map of all available Rpc Routers
61      * 
62      * 
63      */
64     private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new ConcurrentHashMap();
65
66     private var NotificationBrokerImpl notifyBroker
67     private var DataBrokerImpl dataBroker
68     private var ServiceRegistration<NotificationProviderService> notifyBrokerRegistration
69
70     @Property
71     var BundleContext brokerBundleContext
72
73     def start() {
74         initGenerator();
75
76         val executor = Executors.newCachedThreadPool;
77         // Initialization of notificationBroker
78         notifyBroker = new NotificationBrokerImpl(executor);
79         notifyBroker.invokerFactory = generator.invokerFactory;
80         dataBroker = new DataBrokerImpl();
81         val brokerProperties = newProperties();
82         notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
83             brokerProperties)
84         brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
85         brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
86         brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
87
88     }
89
90     def initGenerator() {
91
92         // YANG Binding Class Loader
93         clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
94         generator = new RuntimeCodeGenerator(clsPool);
95     }
96
97     override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) {
98         val ctx = consumer.createContext(bundleCtx)
99         consumer.onSessionInitialized(ctx)
100         return ctx
101     }
102
103     override registerProvider(BindingAwareProvider provider, BundleContext bundleCtx) {
104         val ctx = provider.createContext(bundleCtx)
105         provider.onSessionInitialized(ctx)
106         provider.onSessionInitiated(ctx as ProviderContext)
107         return ctx
108     }
109
110     private def createContext(BindingAwareConsumer consumer, BundleContext consumerCtx) {
111         new OsgiConsumerContext(consumerCtx, this)
112     }
113
114     private def createContext(BindingAwareProvider provider, BundleContext providerCtx) {
115         new OsgiProviderContext(providerCtx, this)
116     }
117
118     /**
119      * Returns a Managed Direct Proxy for supplied class
120      * 
121      * Managed direct proxy is a generated proxy class conforming to the supplied interface
122      * which delegates all calls to the backing delegate.
123      * 
124      * Proxy does not do any validation, null pointer checks or modifies data in any way, it
125      * is only use to avoid exposing direct references to backing implementation of service.
126      * 
127      * If proxy class does not exist for supplied service class it will be generated automatically.
128      */
129     private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
130
131         var RpcProxyContext existing = null
132         if ((existing = managedProxies.get(service)) != null) {
133             return existing.proxy
134         }
135         val proxyInstance = generator.getDirectProxyFor(service)
136         val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
137         val properties = new Hashtable<String, String>()
138         rpcProxyCtx.proxy = proxyInstance as RpcService
139
140         properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
141         rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
142         managedProxies.put(service, rpcProxyCtx)
143         return rpcProxyCtx.proxy
144     }
145
146     /**
147      * Registers RPC Implementation
148      * 
149      */
150     def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
151         Hashtable<String, String> properties) {
152         val proxy = getManagedDirectProxy(type)
153         checkState(proxy.delegate === null, "The Service for type {} is already registered", type)
154
155         val osgiReg = context.bundleContext.registerService(type, service, properties);
156         proxy.delegate = service;
157         return new RpcServiceRegistrationImpl<T>(type, service, osgiReg);
158     }
159
160     def <T extends RpcService> RpcRegistration<T> registerMountedRpcImplementation(Class<T> type, T service,
161         InstanceIdentifier<?> identifier, OsgiProviderContext context) {
162         throw new UnsupportedOperationException("TODO: auto-generated method stub")
163     }
164
165     def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
166         OsgiProviderContext context) {
167         val router = resolveRpcRouter(type);
168         checkState(router !== null)
169         return new RoutedRpcRegistrationImpl<T>(service, router, this)
170     }
171
172     private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
173
174         val router = rpcRouters.get(type);
175         if (router !== null) {
176             return router as RpcRouter<T>;
177         }
178
179         // We created Router
180         val newRouter = generator.getRouterFor(type);
181         checkState(newRouter !== null);
182         rpcRouters.put(type, newRouter);
183
184         // We create / update Direct Proxy for router
185         val proxy = getManagedDirectProxy(type);
186         proxy.delegate = newRouter.invocationProxy
187         return newRouter;
188
189     }
190
191     protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
192         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
193
194         val router = registration.router;
195         val paths = registration.registeredPaths;
196
197         val routingTable = router.getRoutingTable(context)
198         checkState(routingTable != null);
199
200         // Updating internal structure of registration
201         routingTable.updateRoute(path, registration.instance)
202         val success = paths.put(context, path);
203     }
204
205     protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
206         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
207
208         val router = registration.router;
209         val paths = registration.registeredPaths;
210
211         val routingTable = router.getRoutingTable(context)
212         checkState(routingTable != null);
213
214         // Updating internal structure of registration
215         val target = routingTable.getRoute(path)
216         checkState(target === registration.instance)
217         routingTable.removeRoute(path)
218         checkState(paths.remove(context, path));
219     }
220
221     protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
222
223         val router = registration.router;
224         val paths = registration.registeredPaths;
225
226         for (ctxMap : registration.registeredPaths.entries) {
227             val context = ctxMap.key
228             val routingTable = router.getRoutingTable(context)
229             val path = ctxMap.value
230             routingTable.removeRoute(path)
231
232         }
233     }
234 }
235
236 class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
237
238     @Property
239     private val BindingAwareBrokerImpl broker;
240
241     @Property
242     private val RpcRouter<T> router;
243
244     @Property
245     private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
246
247     private var closed = false;
248
249     new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
250         super(instance)
251         _router = backingRouter;
252         _broker = broker;
253     }
254
255     override protected removeRegistration() {
256         closed = true
257         broker.unregisterRoutedRpcService(this)
258     }
259
260     override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
261         registerPath(context, instance);
262     }
263
264     override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
265         unregisterPath(context, instance);
266     }
267
268     override getService() {
269         return instance;
270     }
271
272     override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
273         checkClosed()
274         broker.registerPath(this, context, path);
275     }
276
277     override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
278         checkClosed()
279         broker.unregisterPath(this, context, path);
280     }
281
282     private def checkClosed() {
283         if (closed)
284             throw new IllegalStateException("Registration was closed.");
285     }
286
287 }