Updated implementation of internal RPC Router for Binding-Aware Broker and added...
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / BindingAwareBrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl
9
10 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer
11 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider
12 import org.opendaylight.yangtools.yang.binding.RpcService
13 import javassist.ClassPool
14 import org.osgi.framework.BundleContext
15 import java.util.Map
16 import java.util.HashMap
17 import javassist.LoaderClassPath
18 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker
19 import java.util.Hashtable
20 import static extension org.opendaylight.controller.sal.binding.codegen.RuntimeCodeHelper.*
21
22 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
23 import org.osgi.framework.ServiceRegistration
24 import static org.opendaylight.controller.sal.binding.impl.osgi.Constants.*
25 import static extension org.opendaylight.controller.sal.binding.impl.osgi.PropertiesUtils.*
26 import org.opendaylight.controller.sal.binding.api.NotificationService
27 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext
28
29 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration
33 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration
34 import org.opendaylight.controller.sal.binding.api.data.DataProviderService
35 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
36 import org.opendaylight.controller.sal.binding.spi.RpcRouter
37 import java.util.concurrent.ConcurrentHashMap
38 import static com.google.common.base.Preconditions.*
39 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
40 import org.opendaylight.yangtools.yang.binding.BaseIdentity
41 import com.google.common.collect.Multimap
42 import com.google.common.collect.HashMultimap
43 import static org.opendaylight.controller.sal.binding.impl.osgi.ClassLoaderUtils.*
44
45 class BindingAwareBrokerImpl implements BindingAwareBroker {
46     private static val log = LoggerFactory.getLogger(BindingAwareBrokerImpl)
47
48     private val clsPool = ClassPool.getDefault()
49     private var RuntimeCodeGenerator generator;
50
51     /**
52      * Map of all Managed Direct Proxies
53      * 
54      */
55     private val Map<Class<? extends RpcService>, RpcProxyContext> managedProxies = new ConcurrentHashMap();
56
57     /**
58      * 
59      * Map of all available Rpc Routers
60      * 
61      * 
62      */
63     private val Map<Class<? extends RpcService>, RpcRouter<? extends RpcService>> rpcRouters = new ConcurrentHashMap();
64
65     private var NotificationBrokerImpl notifyBroker
66     private var DataBrokerImpl dataBroker
67     private var ServiceRegistration<NotificationProviderService> notifyBrokerRegistration
68
69     @Property
70     var BundleContext brokerBundleContext
71
72     def start() {
73         initGenerator();
74
75         // Initialization of notificationBroker
76         notifyBroker = new NotificationBrokerImpl(null);
77         dataBroker = new DataBrokerImpl();
78         val brokerProperties = newProperties();
79         notifyBrokerRegistration = brokerBundleContext.registerService(NotificationProviderService, notifyBroker,
80             brokerProperties)
81         brokerBundleContext.registerService(NotificationService, notifyBroker, brokerProperties)
82         brokerBundleContext.registerService(DataProviderService, dataBroker, brokerProperties)
83         brokerBundleContext.registerService(DataBrokerService, dataBroker, brokerProperties)
84
85     }
86
87     def initGenerator() {
88
89         // YANG Binding Class Loader
90         clsPool.appendClassPath(new LoaderClassPath(RpcService.classLoader));
91         generator = new RuntimeCodeGenerator(clsPool);
92     }
93
94     override registerConsumer(BindingAwareConsumer consumer, BundleContext bundleCtx) {
95         val ctx = consumer.createContext(bundleCtx)
96         consumer.onSessionInitialized(ctx)
97         return ctx
98     }
99
100     override registerProvider(BindingAwareProvider provider, BundleContext bundleCtx) {
101         val ctx = provider.createContext(bundleCtx)
102         provider.onSessionInitialized(ctx)
103         provider.onSessionInitiated(ctx as ProviderContext)
104         return ctx
105     }
106
107     private def createContext(BindingAwareConsumer consumer, BundleContext consumerCtx) {
108         new OsgiConsumerContext(consumerCtx, this)
109     }
110
111     private def createContext(BindingAwareProvider provider, BundleContext providerCtx) {
112         new OsgiProviderContext(providerCtx, this)
113     }
114
115     /**
116      * Returns a Managed Direct Proxy for supplied class
117      * 
118      * Managed direct proxy is a generated proxy class conforming to the supplied interface
119      * which delegates all calls to the backing delegate.
120      * 
121      * Proxy does not do any validation, null pointer checks or modifies data in any way, it
122      * is only use to avoid exposing direct references to backing implementation of service.
123      * 
124      * If proxy class does not exist for supplied service class it will be generated automatically.
125      */
126     private def <T extends RpcService> getManagedDirectProxy(Class<T> service) {
127
128         var RpcProxyContext existing = null
129         if ((existing = managedProxies.get(service)) != null) {
130             return existing.proxy
131         }
132         val proxyInstance = generator.getDirectProxyFor(service)
133         val rpcProxyCtx = new RpcProxyContext(proxyInstance.class)
134         val properties = new Hashtable<String, String>()
135         rpcProxyCtx.proxy = proxyInstance as RpcService
136
137         properties.salServiceType = SAL_SERVICE_TYPE_CONSUMER_PROXY
138         rpcProxyCtx.registration = brokerBundleContext.registerService(service, rpcProxyCtx.proxy as T, properties)
139         managedProxies.put(service, rpcProxyCtx)
140         return rpcProxyCtx.proxy
141     }
142
143     /**
144      * Registers RPC Implementation
145      * 
146      */
147     def <T extends RpcService> registerRpcImplementation(Class<T> type, T service, OsgiProviderContext context,
148         Hashtable<String, String> properties) {
149         val proxy = getManagedDirectProxy(type)
150         checkState(proxy.delegate === null, "The Service for type {} is already registered", type)
151
152         val osgiReg = context.bundleContext.registerService(type, service, properties);
153         proxy.delegate = service;
154         return new RpcServiceRegistrationImpl<T>(type, service, osgiReg);
155     }
156
157     def <T extends RpcService> RpcRegistration<T> registerMountedRpcImplementation(Class<T> type, T service,
158         InstanceIdentifier<?> identifier, OsgiProviderContext context) {
159         throw new UnsupportedOperationException("TODO: auto-generated method stub")
160     }
161
162     def <T extends RpcService> RoutedRpcRegistration<T> registerRoutedRpcImplementation(Class<T> type, T service,
163         OsgiProviderContext context) {
164         val router = resolveRpcRouter(type);
165         checkState(router !== null)
166         return new RoutedRpcRegistrationImpl<T>(service, router, this)
167     }
168
169     private def <T extends RpcService> RpcRouter<T> resolveRpcRouter(Class<T> type) {
170
171         val router = rpcRouters.get(type);
172         if (router !== null) {
173             return router as RpcRouter<T>;
174         }
175
176         // We created Router
177         val newRouter = generator.getRouterFor(type);
178         checkState(newRouter !== null);
179         rpcRouters.put(type, newRouter);
180
181         // We create / update Direct Proxy for router
182         val proxy = getManagedDirectProxy(type);
183         proxy.delegate = newRouter.invocationProxy
184         return newRouter;
185
186     }
187
188     protected def <T extends RpcService> void registerPath(RoutedRpcRegistrationImpl<T> registration,
189         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
190
191         val router = registration.router;
192         val paths = registration.registeredPaths;
193
194         val routingTable = router.getRoutingTable(context)
195         checkState(routingTable != null);
196
197         // Updating internal structure of registration
198         routingTable.updateRoute(path, registration.instance)
199         val success = paths.put(context, path);
200     }
201
202     protected def <T extends RpcService> void unregisterPath(RoutedRpcRegistrationImpl<T> registration,
203         Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
204
205         val router = registration.router;
206         val paths = registration.registeredPaths;
207
208         val routingTable = router.getRoutingTable(context)
209         checkState(routingTable != null);
210
211         // Updating internal structure of registration
212         val target = routingTable.getRoute(path)
213         checkState(target === registration.instance)
214         routingTable.removeRoute(path)
215         checkState(paths.remove(context, path));
216     }
217
218     protected def <T extends RpcService> void unregisterRoutedRpcService(RoutedRpcRegistrationImpl<T> registration) {
219
220         val router = registration.router;
221         val paths = registration.registeredPaths;
222
223         for (ctxMap : registration.registeredPaths.entries) {
224             val context = ctxMap.key
225             val routingTable = router.getRoutingTable(context)
226             val path = ctxMap.value
227             routingTable.removeRoute(path)
228
229         }
230     }
231 }
232
233 class RoutedRpcRegistrationImpl<T extends RpcService> extends AbstractObjectRegistration<T> implements RoutedRpcRegistration<T> {
234
235     @Property
236     private val BindingAwareBrokerImpl broker;
237
238     @Property
239     private val RpcRouter<T> router;
240
241     @Property
242     private val Multimap<Class<? extends BaseIdentity>, InstanceIdentifier<?>> registeredPaths = HashMultimap.create();
243
244     private var closed = false;
245
246     new(T instance, RpcRouter<T> backingRouter, BindingAwareBrokerImpl broker) {
247         super(instance)
248         _router = backingRouter;
249         _broker = broker;
250     }
251
252     override protected removeRegistration() {
253         closed = true
254         broker.unregisterRoutedRpcService(this)
255     }
256
257     override registerInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
258         registerPath(context, instance);
259     }
260
261     override unregisterInstance(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> instance) {
262         unregisterPath(context, instance);
263     }
264
265     override getService() {
266         return instance;
267     }
268
269     override registerPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
270         checkClosed()
271         broker.registerPath(this, context, path);
272     }
273
274     override unregisterPath(Class<? extends BaseIdentity> context, InstanceIdentifier<? extends Object> path) {
275         checkClosed()
276         broker.unregisterPath(this, context, path);
277     }
278
279     private def checkClosed() {
280         if (closed)
281             throw new IllegalStateException("Registration was closed.");
282     }
283
284 }