/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.binding.impl; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer; import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; import org.opendaylight.controller.sal.binding.api.BindingAwareService; import org.opendaylight.controller.sal.binding.spi.Mapper; import org.opendaylight.controller.sal.binding.spi.MappingProvider; import org.opendaylight.controller.sal.binding.spi.RpcMapper; import org.opendaylight.controller.sal.binding.spi.RpcMapper.RpcProxyInvocationHandler; import org.opendaylight.controller.sal.binding.spi.SALBindingModule; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.yang.binding.DataObject; import org.opendaylight.controller.yang.binding.RpcService; import org.opendaylight.controller.yang.common.QName; import org.opendaylight.controller.yang.common.RpcResult; import org.opendaylight.controller.yang.data.api.CompositeNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BindingBrokerImpl implements BindingAwareBroker { private static Logger log = LoggerFactory .getLogger(BindingBrokerImpl.class); private Set sessions = new HashSet(); private Set providerSessions = new HashSet(); private Set modules = new HashSet(); private Map, SALBindingModule> salServiceProviders = new HashMap, SALBindingModule>(); private MappingProvider mapping; private BIFacade biFacade = new BIFacade(); private org.opendaylight.controller.sal.core.api.Broker.ProviderSession biSession; private ExecutorService executor; Map, RpcService> rpcImpls = Collections .synchronizedMap(new HashMap, RpcService>()); private RpcProxyInvocationHandler rpcProxyHandler = new RpcProxyInvocationHandler() { @Override public Future> invokeRpc( RpcService proxy, QName rpc, DataObject input) { return rpcProxyInvoked(proxy, rpc, input); } }; @Override public ConsumerSession registerConsumer(BindingAwareConsumer consumer) { checkPredicates(consumer); log.info("Registering consumer " + consumer); ConsumerSessionImpl session = newSessionFor(consumer); consumer.onSessionInitialized(session); sessions.add(session); return session; } @Override public ProviderSession registerProvider(BindingAwareProvider provider) { checkPredicates(provider); ProviderSessionImpl session = newSessionFor(provider); provider.onSessionInitiated(session); providerSessions.add(session); return session; } public void addModule(SALBindingModule module) { log.info("Registering broker module " + module); if (modules.contains(module)) { log.error("Module already registered"); throw new IllegalArgumentException("Module already exists."); } Set> provServices = module .getProvidedServices(); for (Class serviceType : provServices) { log.info(" Registering session service implementation: " + serviceType.getCanonicalName()); salServiceProviders.put(serviceType, module); } } public void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) { sessions.remove(consumerSessionImpl); providerSessions.remove(consumerSessionImpl); } private void checkPredicates(BindingAwareProvider prov) { if (prov == null) throw new IllegalArgumentException("Provider should not be null."); for (ProviderSessionImpl session : providerSessions) { if (prov.equals(session.getProvider())) throw new IllegalStateException("Provider already registered"); } } private void checkPredicates(BindingAwareConsumer cons) { if (cons == null) throw new IllegalArgumentException("Consumer should not be null."); for (ConsumerSessionImpl session : sessions) { if (cons.equals(session.getConsumer())) throw new IllegalStateException("Consumer already registered"); } } private ConsumerSessionImpl newSessionFor(BindingAwareConsumer cons) { return new ConsumerSessionImpl(cons); } private ProviderSessionImpl newSessionFor(BindingAwareProvider provider) { return new ProviderSessionImpl(provider); } private T newSALServiceForSession( Class service, ConsumerSession session) { SALBindingModule serviceProvider = salServiceProviders.get(service); if (serviceProvider == null) { return null; } return serviceProvider.getServiceForSession(service, session); } private T newRpcProxyForSession(Class service) { RpcMapper mapper = mapping.rpcMapperForClass(service); if (mapper == null) { log.error("Mapper for " + service + "is unavailable."); return null; } T proxy = mapper.getConsumerProxy(rpcProxyHandler); return proxy; } private Future> rpcProxyInvoked( RpcService rpcProxy, QName rpcType, DataObject inputData) { if (rpcProxy == null) { throw new IllegalArgumentException("Proxy must not be null"); } if (rpcType == null) { throw new IllegalArgumentException( "rpcType (QName) should not be null"); } Future> ret = null; // Real invocation starts here RpcMapper mapper = mapping .rpcMapperForProxy(rpcProxy); RpcService impl = rpcImpls.get(mapper.getServiceClass()); if (impl == null) { // RPC is probably remote CompositeNode inputNode = null; Mapper inputMapper = mapper.getInputMapper(); if (inputMapper != null) { inputNode = inputMapper.domFromObject(inputData); } Future> biResult = biSession.rpc(rpcType, inputNode); ret = new TranslatedFuture(biResult, mapper); } else { // RPC is local Callable> invocation = localRpcCallableFor( impl, mapper, rpcType, inputData); ret = executor.submit(invocation); } return ret; } private Callable> localRpcCallableFor( final RpcService impl, final RpcMapper mapper, final QName rpcType, final DataObject inputData) { return new Callable>() { @Override public RpcResult call() throws Exception { return mapper.invokeRpcImplementation(rpcType, impl, inputData); } }; } // Binding Independent invocation of Binding Aware RPC private RpcResult invokeLocalRpc(QName rpc, CompositeNode inputNode) { RpcMapper mapper = mapping.rpcMapperForData(rpc, inputNode); DataObject inputTO = mapper.getInputMapper().objectFromDom(inputNode); RpcService impl = rpcImpls.get(mapper.getServiceClass()); if (impl == null) { log.warn("Implementation for rpc: " + rpc + "not available."); } RpcResult result = mapper .invokeRpcImplementation(rpc, impl, inputTO); DataObject outputTO = result.getResult(); CompositeNode outputNode = null; if (outputTO != null) { outputNode = mapper.getOutputMapper().domFromObject(outputTO); } return Rpcs.getRpcResult(result.isSuccessful(), outputNode, result.getErrors()); } private class ConsumerSessionImpl implements BindingAwareBroker.ConsumerSession { private final BindingAwareConsumer consumer; private Map, BindingAwareService> sessionSalServices = Collections .synchronizedMap(new HashMap, BindingAwareService>()); private Map, RpcService> sessionRpcProxies = Collections .synchronizedMap(new HashMap, RpcService>()); public ConsumerSessionImpl(BindingAwareConsumer cons) { this.consumer = cons; } @Override public T getSALService(Class service) { BindingAwareService serv = sessionSalServices.get(service); if (serv != null) { if (service.isInstance(serv)) { @SuppressWarnings("unchecked") T ret = (T) serv; return ret; } else { log.error("Implementation for service " + service.getName() + " does not implement the service interface"); throw new IllegalStateException("Service implementation " + serv.getClass().getName() + "does not implement " + service.getName()); } } else { T ret = BindingBrokerImpl.this.newSALServiceForSession(service, this); if (ret != null) { sessionSalServices.put(service, ret); } return ret; } } @Override public T getRpcService(Class service) { RpcService current = sessionRpcProxies.get(service); if (current != null) { if (service.isInstance(current)) { @SuppressWarnings("unchecked") T ret = (T) current; return ret; } else { log.error("Proxy for rpc service " + service.getName() + " does not implement the service interface"); throw new IllegalStateException("Service implementation " + current.getClass().getName() + "does not implement " + service.getName()); } } else { T ret = BindingBrokerImpl.this.newRpcProxyForSession(service); if (ret != null) { sessionRpcProxies.put(service, ret); } return ret; } } public BindingAwareConsumer getConsumer() { return this.consumer; } } private class ProviderSessionImpl extends ConsumerSessionImpl implements BindingAwareBroker.ProviderSession { private final BindingAwareProvider provider; public ProviderSessionImpl(BindingAwareProvider provider2) { super(null); this.provider = provider2; } @Override public void addRpcImplementation(RpcService implementation) { if (implementation == null) { throw new IllegalArgumentException( "Implementation should not be null"); } // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @Override public void removeRpcImplementation(RpcService implementation) { if (implementation == null) { throw new IllegalArgumentException( "Implementation should not be null"); } // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } public BindingAwareProvider getProvider() { return this.provider; } } private class BIFacade implements Provider,RpcImplementation { @Override public Set getSupportedRpcs() { return Collections.emptySet(); } @Override public RpcResult invokeRpc(QName rpc, CompositeNode input) { if (rpc == null) { throw new IllegalArgumentException( "Rpc type should not be null"); } return BindingBrokerImpl.this.invokeLocalRpc(rpc, input); } @Override public void onSessionInitiated( org.opendaylight.controller.sal.core.api.Broker.ProviderSession session) { BindingBrokerImpl.this.biSession = session; for (SALBindingModule module : modules) { try { module.onBISessionAvailable(biSession); } catch(Exception e) { log.error("Module " +module +" throwed unexpected exception",e); } } } @Override public Collection getProviderFunctionality() { return Collections.emptySet(); } } private static class TranslatedFuture implements Future> { private final Future> realFuture; private final RpcMapper mapper; public TranslatedFuture(Future> future, RpcMapper mapper) { realFuture = future; this.mapper = mapper; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return realFuture.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { return realFuture.isCancelled(); } @Override public boolean isDone() { return realFuture.isDone(); } @Override public RpcResult get() throws InterruptedException, ExecutionException { RpcResult val = realFuture.get(); return tranlate(val); } @Override public RpcResult get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { RpcResult val = realFuture.get(timeout, unit); return tranlate(val); } private RpcResult tranlate( RpcResult result) { CompositeNode outputNode = result.getResult(); DataObject outputTO = null; if (outputNode != null) { Mapper outputMapper = mapper.getOutputMapper(); outputTO = outputMapper.objectFromDom(outputNode); } return Rpcs.getRpcResult(result.isSuccessful(), outputTO, result.getErrors()); } } }