32eff18d4a3644069cec3d8cfce0bbdff7fb5c36
[controller.git] / opendaylight / sal / yang-prototype / sal / sal-binding-broker-impl / src / main / java / org / opendaylight / controller / sal / binding / impl / BindingBrokerImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
14 import java.util.Map;
15 import java.util.Set;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22
23 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
24 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
25 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
26 import org.opendaylight.controller.sal.binding.api.BindingAwareService;
27 import org.opendaylight.controller.sal.binding.spi.Mapper;
28 import org.opendaylight.controller.sal.binding.spi.MappingProvider;
29 import org.opendaylight.controller.sal.binding.spi.RpcMapper;
30 import org.opendaylight.controller.sal.binding.spi.RpcMapper.RpcProxyInvocationHandler;
31 import org.opendaylight.controller.sal.binding.spi.SALBindingModule;
32 import org.opendaylight.controller.sal.common.util.Rpcs;
33 import org.opendaylight.controller.sal.core.api.Provider;
34 import org.opendaylight.controller.sal.core.api.RpcImplementation;
35 import org.opendaylight.controller.yang.binding.DataObject;
36 import org.opendaylight.controller.yang.binding.RpcService;
37 import org.opendaylight.controller.yang.common.QName;
38 import org.opendaylight.controller.yang.common.RpcResult;
39 import org.opendaylight.controller.yang.data.api.CompositeNode;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 public class BindingBrokerImpl implements BindingAwareBroker {
44
45     private static Logger log = LoggerFactory
46             .getLogger(BindingBrokerImpl.class);
47
48     private Set<ConsumerSessionImpl> sessions = new HashSet<ConsumerSessionImpl>();
49     private Set<ProviderSessionImpl> providerSessions = new HashSet<ProviderSessionImpl>();
50
51     private Set<SALBindingModule> modules = new HashSet<SALBindingModule>();
52     private Map<Class<? extends BindingAwareService>, SALBindingModule> salServiceProviders = new HashMap<Class<? extends BindingAwareService>, SALBindingModule>();
53     private MappingProvider mapping;
54     private BIFacade biFacade = new BIFacade();
55     private org.opendaylight.controller.sal.core.api.Broker.ProviderSession biSession;
56     private ExecutorService executor;
57
58     Map<Class<? extends RpcService>, RpcService> rpcImpls = Collections
59             .synchronizedMap(new HashMap<Class<? extends RpcService>, RpcService>());
60
61     private RpcProxyInvocationHandler rpcProxyHandler = new RpcProxyInvocationHandler() {
62
63         @Override
64         public Future<RpcResult<? extends DataObject>> invokeRpc(
65                 RpcService proxy, QName rpc, DataObject input) {
66             return rpcProxyInvoked(proxy, rpc, input);
67         }
68     };
69
70     @Override
71     public ConsumerSession registerConsumer(BindingAwareConsumer consumer) {
72         checkPredicates(consumer);
73         log.info("Registering consumer " + consumer);
74
75         ConsumerSessionImpl session = newSessionFor(consumer);
76         consumer.onSessionInitialized(session);
77
78         sessions.add(session);
79         return session;
80     }
81
82     @Override
83     public ProviderSession registerProvider(BindingAwareProvider provider) {
84         checkPredicates(provider);
85
86         ProviderSessionImpl session = newSessionFor(provider);
87         provider.onSessionInitiated(session);
88
89         providerSessions.add(session);
90         return session;
91     }
92
93     public void addModule(SALBindingModule module) {
94         log.info("Registering broker module " + module);
95         if (modules.contains(module)) {
96             log.error("Module already registered");
97             throw new IllegalArgumentException("Module already exists.");
98         }
99
100         Set<Class<? extends BindingAwareService>> provServices = module
101                 .getProvidedServices();
102         for (Class<? extends BindingAwareService> serviceType : provServices) {
103             log.info("  Registering session service implementation: "
104                     + serviceType.getCanonicalName());
105             salServiceProviders.put(serviceType, module);
106         }
107     }
108
109     public void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
110         sessions.remove(consumerSessionImpl);
111         providerSessions.remove(consumerSessionImpl);
112     }
113
114     private void checkPredicates(BindingAwareProvider prov) {
115         if (prov == null)
116             throw new IllegalArgumentException("Provider should not be null.");
117         for (ProviderSessionImpl session : providerSessions) {
118             if (prov.equals(session.getProvider()))
119                 throw new IllegalStateException("Provider already registered");
120         }
121
122     }
123
124     private void checkPredicates(BindingAwareConsumer cons) {
125         if (cons == null)
126             throw new IllegalArgumentException("Consumer should not be null.");
127         for (ConsumerSessionImpl session : sessions) {
128             if (cons.equals(session.getConsumer()))
129                 throw new IllegalStateException("Consumer already registered");
130         }
131     }
132
133     private ConsumerSessionImpl newSessionFor(BindingAwareConsumer cons) {
134         return new ConsumerSessionImpl(cons);
135     }
136
137     private ProviderSessionImpl newSessionFor(BindingAwareProvider provider) {
138         return new ProviderSessionImpl(provider);
139     }
140
141     private <T extends BindingAwareService> T newSALServiceForSession(
142             Class<T> service, ConsumerSession session) {
143
144         SALBindingModule serviceProvider = salServiceProviders.get(service);
145         if (serviceProvider == null) {
146             return null;
147         }
148         return serviceProvider.getServiceForSession(service, session);
149
150     }
151
152     private <T extends RpcService> T newRpcProxyForSession(Class<T> service) {
153
154         RpcMapper<T> mapper = mapping.rpcMapperForClass(service);
155         if (mapper == null) {
156             log.error("Mapper for " + service + "is unavailable.");
157             return null;
158         }
159         T proxy = mapper.getConsumerProxy(rpcProxyHandler);
160
161         return proxy;
162     }
163
164     private Future<RpcResult<? extends DataObject>> rpcProxyInvoked(
165             RpcService rpcProxy, QName rpcType, DataObject inputData) {
166         if (rpcProxy == null) {
167             throw new IllegalArgumentException("Proxy must not be null");
168         }
169         if (rpcType == null) {
170             throw new IllegalArgumentException(
171                     "rpcType (QName) should not be null");
172         }
173         Future<RpcResult<? extends DataObject>> ret = null;
174
175         // Real invocation starts here
176         RpcMapper<? extends RpcService> mapper = mapping
177                 .rpcMapperForProxy(rpcProxy);
178         RpcService impl = rpcImpls.get(mapper.getServiceClass());
179
180         if (impl == null) {
181             // RPC is probably remote
182             CompositeNode inputNode = null;
183             Mapper<? extends DataObject> inputMapper = mapper.getInputMapper();
184             if (inputMapper != null) {
185                 inputNode = inputMapper.domFromObject(inputData);
186             }
187             Future<RpcResult<CompositeNode>> biResult = biSession.rpc(rpcType,
188                     inputNode);
189             ret = new TranslatedFuture(biResult, mapper);
190
191         } else {
192             // RPC is local
193             Callable<RpcResult<? extends DataObject>> invocation = localRpcCallableFor(
194                     impl, mapper, rpcType, inputData);
195             ret = executor.submit(invocation);
196         }
197         return ret;
198     }
199
200     private Callable<RpcResult<? extends DataObject>> localRpcCallableFor(
201             final RpcService impl,
202             final RpcMapper<? extends RpcService> mapper, final QName rpcType,
203             final DataObject inputData) {
204
205         return new Callable<RpcResult<? extends DataObject>>() {
206
207             @Override
208             public RpcResult<? extends DataObject> call() throws Exception {
209                 return mapper.invokeRpcImplementation(rpcType, impl, inputData);
210             }
211         };
212     }
213
214     // Binding Independent invocation of Binding Aware RPC
215     private RpcResult<CompositeNode> invokeLocalRpc(QName rpc,
216             CompositeNode inputNode) {
217         RpcMapper<? extends RpcService> mapper = mapping.rpcMapperForData(rpc,
218                 inputNode);
219
220         DataObject inputTO = mapper.getInputMapper().objectFromDom(inputNode);
221
222         RpcService impl = rpcImpls.get(mapper.getServiceClass());
223         if (impl == null) {
224             log.warn("Implementation for rpc: " + rpc + "not available.");
225         }
226         RpcResult<? extends DataObject> result = mapper
227                 .invokeRpcImplementation(rpc, impl, inputTO);
228         DataObject outputTO = result.getResult();
229
230         CompositeNode outputNode = null;
231         if (outputTO != null) {
232             outputNode = mapper.getOutputMapper().domFromObject(outputTO);
233         }
234         return Rpcs.getRpcResult(result.isSuccessful(), outputNode,
235                 result.getErrors());
236     }
237
238     private class ConsumerSessionImpl implements
239             BindingAwareBroker.ConsumerSession {
240
241         private final BindingAwareConsumer consumer;
242         private Map<Class<? extends BindingAwareService>, BindingAwareService> sessionSalServices = Collections
243                 .synchronizedMap(new HashMap<Class<? extends BindingAwareService>, BindingAwareService>());
244
245         private Map<Class<? extends RpcService>, RpcService> sessionRpcProxies = Collections
246                 .synchronizedMap(new HashMap<Class<? extends RpcService>, RpcService>());
247
248         public ConsumerSessionImpl(BindingAwareConsumer cons) {
249             this.consumer = cons;
250         }
251
252         @Override
253         public <T extends BindingAwareService> T getSALService(Class<T> service) {
254
255             BindingAwareService serv = sessionSalServices.get(service);
256             if (serv != null) {
257                 if (service.isInstance(serv)) {
258                     @SuppressWarnings("unchecked")
259                     T ret = (T) serv;
260                     return ret;
261                 } else {
262                     log.error("Implementation for service " + service.getName()
263                             + " does not implement the service interface");
264                     throw new IllegalStateException("Service implementation "
265                             + serv.getClass().getName() + "does not implement "
266                             + service.getName());
267                 }
268             } else {
269                 T ret = BindingBrokerImpl.this.newSALServiceForSession(service,
270                         this);
271                 if (ret != null) {
272                     sessionSalServices.put(service, ret);
273                 }
274                 return ret;
275             }
276         }
277
278         @Override
279         public <T extends RpcService> T getRpcService(Class<T> service) {
280             RpcService current = sessionRpcProxies.get(service);
281             if (current != null) {
282                 if (service.isInstance(current)) {
283                     @SuppressWarnings("unchecked")
284                     T ret = (T) current;
285                     return ret;
286                 } else {
287                     log.error("Proxy  for rpc service " + service.getName()
288                             + " does not implement the service interface");
289                     throw new IllegalStateException("Service implementation "
290                             + current.getClass().getName()
291                             + "does not implement " + service.getName());
292                 }
293             } else {
294                 T ret = BindingBrokerImpl.this.newRpcProxyForSession(service);
295                 if (ret != null) {
296                     sessionRpcProxies.put(service, ret);
297                 }
298                 return ret;
299             }
300         }
301
302         public BindingAwareConsumer getConsumer() {
303             return this.consumer;
304         }
305
306     }
307
308     private class ProviderSessionImpl extends ConsumerSessionImpl implements
309             BindingAwareBroker.ProviderSession {
310
311         private final BindingAwareProvider provider;
312
313         public ProviderSessionImpl(BindingAwareProvider provider2) {
314             super(null);
315             this.provider = provider2;
316         }
317
318         @Override
319         public void addRpcImplementation(RpcService implementation) {
320             if (implementation == null) {
321                 throw new IllegalArgumentException(
322                         "Implementation should not be null");
323             }
324             // TODO Implement this method
325             throw new UnsupportedOperationException("Not implemented");
326         }
327
328         @Override
329         public void removeRpcImplementation(RpcService implementation) {
330             if (implementation == null) {
331                 throw new IllegalArgumentException(
332                         "Implementation should not be null");
333             }
334             // TODO Implement this method
335             throw new UnsupportedOperationException("Not implemented");
336         }
337
338         public BindingAwareProvider getProvider() {
339             return this.provider;
340         }
341
342     }
343
344     private class BIFacade implements Provider,RpcImplementation {
345
346         @Override
347         public Set<QName> getSupportedRpcs() {
348             return Collections.emptySet();
349         }
350
351         @Override
352         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
353             if (rpc == null) {
354                 throw new IllegalArgumentException(
355                         "Rpc type should not be null");
356             }
357
358             return BindingBrokerImpl.this.invokeLocalRpc(rpc, input);
359         }
360
361         @Override
362         public void onSessionInitiated(
363                 org.opendaylight.controller.sal.core.api.Broker.ProviderSession session) {
364             
365             BindingBrokerImpl.this.biSession = session;
366             for (SALBindingModule module : modules) {
367                 try {
368                     module.onBISessionAvailable(biSession);
369                 } catch(Exception e) {
370                     log.error("Module " +module +" throwed unexpected exception",e);
371                 }
372             }
373         }
374
375         @Override
376         public Collection<ProviderFunctionality> getProviderFunctionality() {
377             return Collections.emptySet();
378         }
379
380     }
381
382     private static class TranslatedFuture implements
383             Future<RpcResult<? extends DataObject>> {
384         private final Future<RpcResult<CompositeNode>> realFuture;
385         private final RpcMapper<?> mapper;
386
387         public TranslatedFuture(Future<RpcResult<CompositeNode>> future,
388                 RpcMapper<?> mapper) {
389             realFuture = future;
390             this.mapper = mapper;
391         }
392
393         @Override
394         public boolean cancel(boolean mayInterruptIfRunning) {
395             return realFuture.cancel(mayInterruptIfRunning);
396         }
397
398         @Override
399         public boolean isCancelled() {
400             return realFuture.isCancelled();
401         }
402
403         @Override
404         public boolean isDone() {
405             return realFuture.isDone();
406         }
407
408         @Override
409         public RpcResult<? extends DataObject> get()
410                 throws InterruptedException, ExecutionException {
411             RpcResult<CompositeNode> val = realFuture.get();
412             return tranlate(val);
413         }
414
415         @Override
416         public RpcResult<? extends DataObject> get(long timeout, TimeUnit unit)
417                 throws InterruptedException, ExecutionException,
418                 TimeoutException {
419             RpcResult<CompositeNode> val = realFuture.get(timeout, unit);
420             return tranlate(val);
421         }
422
423         private RpcResult<? extends DataObject> tranlate(
424                 RpcResult<CompositeNode> result) {
425             CompositeNode outputNode = result.getResult();
426             DataObject outputTO = null;
427             if (outputNode != null) {
428                 Mapper<?> outputMapper = mapper.getOutputMapper();
429                 outputTO = outputMapper.objectFromDom(outputNode);
430             }
431             return Rpcs.getRpcResult(result.isSuccessful(), outputTO,
432                     result.getErrors());
433         }
434
435     }
436 }