2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.binding.impl;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.HashMap;
13 import java.util.HashSet;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Future;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
24 import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer;
25 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
26 import org.opendaylight.controller.sal.binding.api.BindingAwareService;
27 import org.opendaylight.controller.sal.binding.spi.Mapper;
28 import org.opendaylight.controller.sal.binding.spi.MappingProvider;
29 import org.opendaylight.controller.sal.binding.spi.RpcMapper;
30 import org.opendaylight.controller.sal.binding.spi.RpcMapper.RpcProxyInvocationHandler;
31 import org.opendaylight.controller.sal.binding.spi.SALBindingModule;
32 import org.opendaylight.controller.sal.common.util.Rpcs;
33 import org.opendaylight.controller.sal.core.api.Provider;
34 import org.opendaylight.controller.sal.core.api.RpcImplementation;
35 import org.opendaylight.controller.yang.binding.DataObject;
36 import org.opendaylight.controller.yang.binding.RpcService;
37 import org.opendaylight.controller.yang.common.QName;
38 import org.opendaylight.controller.yang.common.RpcResult;
39 import org.opendaylight.controller.yang.data.api.CompositeNode;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 public class BindingBrokerImpl implements BindingAwareBroker {
45 private static Logger log = LoggerFactory
46 .getLogger(BindingBrokerImpl.class);
48 private Set<ConsumerSessionImpl> sessions = new HashSet<ConsumerSessionImpl>();
49 private Set<ProviderSessionImpl> providerSessions = new HashSet<ProviderSessionImpl>();
51 private Set<SALBindingModule> modules = new HashSet<SALBindingModule>();
52 private Map<Class<? extends BindingAwareService>, SALBindingModule> salServiceProviders = new HashMap<Class<? extends BindingAwareService>, SALBindingModule>();
53 private MappingProvider mapping;
54 private BIFacade biFacade = new BIFacade();
55 private org.opendaylight.controller.sal.core.api.Broker.ProviderSession biSession;
56 private ExecutorService executor;
58 Map<Class<? extends RpcService>, RpcService> rpcImpls = Collections
59 .synchronizedMap(new HashMap<Class<? extends RpcService>, RpcService>());
61 private RpcProxyInvocationHandler rpcProxyHandler = new RpcProxyInvocationHandler() {
64 public Future<RpcResult<? extends DataObject>> invokeRpc(
65 RpcService proxy, QName rpc, DataObject input) {
66 return rpcProxyInvoked(proxy, rpc, input);
71 public ConsumerSession registerConsumer(BindingAwareConsumer consumer) {
72 checkPredicates(consumer);
73 log.info("Registering consumer " + consumer);
75 ConsumerSessionImpl session = newSessionFor(consumer);
76 consumer.onSessionInitialized(session);
78 sessions.add(session);
83 public ProviderSession registerProvider(BindingAwareProvider provider) {
84 checkPredicates(provider);
86 ProviderSessionImpl session = newSessionFor(provider);
87 provider.onSessionInitiated(session);
89 providerSessions.add(session);
93 public void addModule(SALBindingModule module) {
94 log.info("Registering broker module " + module);
95 if (modules.contains(module)) {
96 log.error("Module already registered");
97 throw new IllegalArgumentException("Module already exists.");
100 Set<Class<? extends BindingAwareService>> provServices = module
101 .getProvidedServices();
102 for (Class<? extends BindingAwareService> serviceType : provServices) {
103 log.info(" Registering session service implementation: "
104 + serviceType.getCanonicalName());
105 salServiceProviders.put(serviceType, module);
109 public void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
110 sessions.remove(consumerSessionImpl);
111 providerSessions.remove(consumerSessionImpl);
114 private void checkPredicates(BindingAwareProvider prov) {
116 throw new IllegalArgumentException("Provider should not be null.");
117 for (ProviderSessionImpl session : providerSessions) {
118 if (prov.equals(session.getProvider()))
119 throw new IllegalStateException("Provider already registered");
124 private void checkPredicates(BindingAwareConsumer cons) {
126 throw new IllegalArgumentException("Consumer should not be null.");
127 for (ConsumerSessionImpl session : sessions) {
128 if (cons.equals(session.getConsumer()))
129 throw new IllegalStateException("Consumer already registered");
133 private ConsumerSessionImpl newSessionFor(BindingAwareConsumer cons) {
134 return new ConsumerSessionImpl(cons);
137 private ProviderSessionImpl newSessionFor(BindingAwareProvider provider) {
138 return new ProviderSessionImpl(provider);
141 private <T extends BindingAwareService> T newSALServiceForSession(
142 Class<T> service, ConsumerSession session) {
144 SALBindingModule serviceProvider = salServiceProviders.get(service);
145 if (serviceProvider == null) {
148 return serviceProvider.getServiceForSession(service, session);
152 private <T extends RpcService> T newRpcProxyForSession(Class<T> service) {
154 RpcMapper<T> mapper = mapping.rpcMapperForClass(service);
155 if (mapper == null) {
156 log.error("Mapper for " + service + "is unavailable.");
159 T proxy = mapper.getConsumerProxy(rpcProxyHandler);
164 private Future<RpcResult<? extends DataObject>> rpcProxyInvoked(
165 RpcService rpcProxy, QName rpcType, DataObject inputData) {
166 if (rpcProxy == null) {
167 throw new IllegalArgumentException("Proxy must not be null");
169 if (rpcType == null) {
170 throw new IllegalArgumentException(
171 "rpcType (QName) should not be null");
173 Future<RpcResult<? extends DataObject>> ret = null;
175 // Real invocation starts here
176 RpcMapper<? extends RpcService> mapper = mapping
177 .rpcMapperForProxy(rpcProxy);
178 RpcService impl = rpcImpls.get(mapper.getServiceClass());
181 // RPC is probably remote
182 CompositeNode inputNode = null;
183 Mapper<? extends DataObject> inputMapper = mapper.getInputMapper();
184 if (inputMapper != null) {
185 inputNode = inputMapper.domFromObject(inputData);
187 Future<RpcResult<CompositeNode>> biResult = biSession.rpc(rpcType,
189 ret = new TranslatedFuture(biResult, mapper);
193 Callable<RpcResult<? extends DataObject>> invocation = localRpcCallableFor(
194 impl, mapper, rpcType, inputData);
195 ret = executor.submit(invocation);
200 private Callable<RpcResult<? extends DataObject>> localRpcCallableFor(
201 final RpcService impl,
202 final RpcMapper<? extends RpcService> mapper, final QName rpcType,
203 final DataObject inputData) {
205 return new Callable<RpcResult<? extends DataObject>>() {
208 public RpcResult<? extends DataObject> call() throws Exception {
209 return mapper.invokeRpcImplementation(rpcType, impl, inputData);
214 // Binding Independent invocation of Binding Aware RPC
215 private RpcResult<CompositeNode> invokeLocalRpc(QName rpc,
216 CompositeNode inputNode) {
217 RpcMapper<? extends RpcService> mapper = mapping.rpcMapperForData(rpc,
220 DataObject inputTO = mapper.getInputMapper().objectFromDom(inputNode);
222 RpcService impl = rpcImpls.get(mapper.getServiceClass());
224 log.warn("Implementation for rpc: " + rpc + "not available.");
226 RpcResult<? extends DataObject> result = mapper
227 .invokeRpcImplementation(rpc, impl, inputTO);
228 DataObject outputTO = result.getResult();
230 CompositeNode outputNode = null;
231 if (outputTO != null) {
232 outputNode = mapper.getOutputMapper().domFromObject(outputTO);
234 return Rpcs.getRpcResult(result.isSuccessful(), outputNode,
238 private class ConsumerSessionImpl implements
239 BindingAwareBroker.ConsumerSession {
241 private final BindingAwareConsumer consumer;
242 private Map<Class<? extends BindingAwareService>, BindingAwareService> sessionSalServices = Collections
243 .synchronizedMap(new HashMap<Class<? extends BindingAwareService>, BindingAwareService>());
245 private Map<Class<? extends RpcService>, RpcService> sessionRpcProxies = Collections
246 .synchronizedMap(new HashMap<Class<? extends RpcService>, RpcService>());
248 public ConsumerSessionImpl(BindingAwareConsumer cons) {
249 this.consumer = cons;
253 public <T extends BindingAwareService> T getSALService(Class<T> service) {
255 BindingAwareService serv = sessionSalServices.get(service);
257 if (service.isInstance(serv)) {
258 @SuppressWarnings("unchecked")
262 log.error("Implementation for service " + service.getName()
263 + " does not implement the service interface");
264 throw new IllegalStateException("Service implementation "
265 + serv.getClass().getName() + "does not implement "
266 + service.getName());
269 T ret = BindingBrokerImpl.this.newSALServiceForSession(service,
272 sessionSalServices.put(service, ret);
279 public <T extends RpcService> T getRpcService(Class<T> service) {
280 RpcService current = sessionRpcProxies.get(service);
281 if (current != null) {
282 if (service.isInstance(current)) {
283 @SuppressWarnings("unchecked")
287 log.error("Proxy for rpc service " + service.getName()
288 + " does not implement the service interface");
289 throw new IllegalStateException("Service implementation "
290 + current.getClass().getName()
291 + "does not implement " + service.getName());
294 T ret = BindingBrokerImpl.this.newRpcProxyForSession(service);
296 sessionRpcProxies.put(service, ret);
302 public BindingAwareConsumer getConsumer() {
303 return this.consumer;
308 private class ProviderSessionImpl extends ConsumerSessionImpl implements
309 BindingAwareBroker.ProviderSession {
311 private final BindingAwareProvider provider;
313 public ProviderSessionImpl(BindingAwareProvider provider2) {
315 this.provider = provider2;
319 public void addRpcImplementation(RpcService implementation) {
320 if (implementation == null) {
321 throw new IllegalArgumentException(
322 "Implementation should not be null");
324 // TODO Implement this method
325 throw new UnsupportedOperationException("Not implemented");
329 public void removeRpcImplementation(RpcService implementation) {
330 if (implementation == null) {
331 throw new IllegalArgumentException(
332 "Implementation should not be null");
334 // TODO Implement this method
335 throw new UnsupportedOperationException("Not implemented");
338 public BindingAwareProvider getProvider() {
339 return this.provider;
344 private class BIFacade implements Provider,RpcImplementation {
347 public Set<QName> getSupportedRpcs() {
348 return Collections.emptySet();
352 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
354 throw new IllegalArgumentException(
355 "Rpc type should not be null");
358 return BindingBrokerImpl.this.invokeLocalRpc(rpc, input);
362 public void onSessionInitiated(
363 org.opendaylight.controller.sal.core.api.Broker.ProviderSession session) {
365 BindingBrokerImpl.this.biSession = session;
366 for (SALBindingModule module : modules) {
368 module.onBISessionAvailable(biSession);
369 } catch(Exception e) {
370 log.error("Module " +module +" throwed unexpected exception",e);
376 public Collection<ProviderFunctionality> getProviderFunctionality() {
377 return Collections.emptySet();
382 private static class TranslatedFuture implements
383 Future<RpcResult<? extends DataObject>> {
384 private final Future<RpcResult<CompositeNode>> realFuture;
385 private final RpcMapper<?> mapper;
387 public TranslatedFuture(Future<RpcResult<CompositeNode>> future,
388 RpcMapper<?> mapper) {
390 this.mapper = mapper;
394 public boolean cancel(boolean mayInterruptIfRunning) {
395 return realFuture.cancel(mayInterruptIfRunning);
399 public boolean isCancelled() {
400 return realFuture.isCancelled();
404 public boolean isDone() {
405 return realFuture.isDone();
409 public RpcResult<? extends DataObject> get()
410 throws InterruptedException, ExecutionException {
411 RpcResult<CompositeNode> val = realFuture.get();
412 return tranlate(val);
416 public RpcResult<? extends DataObject> get(long timeout, TimeUnit unit)
417 throws InterruptedException, ExecutionException,
419 RpcResult<CompositeNode> val = realFuture.get(timeout, unit);
420 return tranlate(val);
423 private RpcResult<? extends DataObject> tranlate(
424 RpcResult<CompositeNode> result) {
425 CompositeNode outputNode = result.getResult();
426 DataObject outputTO = null;
427 if (outputNode != null) {
428 Mapper<?> outputMapper = mapper.getOutputMapper();
429 outputTO = outputMapper.objectFromDom(outputNode);
431 return Rpcs.getRpcResult(result.isSuccessful(), outputTO,