X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fsal%2Fyang-prototype%2Fsal%2Fsal-binding-broker-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FBindingBrokerImpl.java;fp=opendaylight%2Fsal%2Fyang-prototype%2Fsal%2Fsal-binding-broker-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FBindingBrokerImpl.java;h=32eff18d4a3644069cec3d8cfce0bbdff7fb5c36;hb=517b29d7abee88c4c99b3f75866c6a8e4b8fbd5d;hp=2209f84cac7ca86b29da96487dd1b45329daca5d;hpb=1d01212bc66db4193eb67007267b105fc02cbd71;p=controller.git diff --git a/opendaylight/sal/yang-prototype/sal/sal-binding-broker-impl/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingBrokerImpl.java b/opendaylight/sal/yang-prototype/sal/sal-binding-broker-impl/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingBrokerImpl.java index 2209f84cac..32eff18d4a 100644 --- a/opendaylight/sal/yang-prototype/sal/sal-binding-broker-impl/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingBrokerImpl.java +++ b/opendaylight/sal/yang-prototype/sal/sal-binding-broker-impl/src/main/java/org/opendaylight/controller/sal/binding/impl/BindingBrokerImpl.java @@ -7,17 +7,36 @@ */ package org.opendaylight.controller.sal.binding.impl; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.BindingAwareConsumer; import org.opendaylight.controller.sal.binding.api.BindingAwareProvider; import org.opendaylight.controller.sal.binding.api.BindingAwareService; +import org.opendaylight.controller.sal.binding.spi.Mapper; +import org.opendaylight.controller.sal.binding.spi.MappingProvider; +import org.opendaylight.controller.sal.binding.spi.RpcMapper; +import org.opendaylight.controller.sal.binding.spi.RpcMapper.RpcProxyInvocationHandler; import org.opendaylight.controller.sal.binding.spi.SALBindingModule; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.core.api.Provider; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.controller.yang.binding.DataObject; import org.opendaylight.controller.yang.binding.RpcService; +import org.opendaylight.controller.yang.common.QName; +import org.opendaylight.controller.yang.common.RpcResult; +import org.opendaylight.controller.yang.data.api.CompositeNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +50,22 @@ public class BindingBrokerImpl implements BindingAwareBroker { private Set modules = new HashSet(); private Map, SALBindingModule> salServiceProviders = new HashMap, SALBindingModule>(); + private MappingProvider mapping; + private BIFacade biFacade = new BIFacade(); + private org.opendaylight.controller.sal.core.api.Broker.ProviderSession biSession; + private ExecutorService executor; + + Map, RpcService> rpcImpls = Collections + .synchronizedMap(new HashMap, RpcService>()); + + private RpcProxyInvocationHandler rpcProxyHandler = new RpcProxyInvocationHandler() { + + @Override + public Future> invokeRpc( + RpcService proxy, QName rpc, DataObject input) { + return rpcProxyInvoked(proxy, rpc, input); + } + }; @Override public ConsumerSession registerConsumer(BindingAwareConsumer consumer) { @@ -41,9 +76,7 @@ public class BindingBrokerImpl implements BindingAwareBroker { consumer.onSessionInitialized(session); sessions.add(session); - return session; - } @Override @@ -116,11 +149,101 @@ public class BindingBrokerImpl implements BindingAwareBroker { } + private T newRpcProxyForSession(Class service) { + + RpcMapper mapper = mapping.rpcMapperForClass(service); + if (mapper == null) { + log.error("Mapper for " + service + "is unavailable."); + return null; + } + T proxy = mapper.getConsumerProxy(rpcProxyHandler); + + return proxy; + } + + private Future> rpcProxyInvoked( + RpcService rpcProxy, QName rpcType, DataObject inputData) { + if (rpcProxy == null) { + throw new IllegalArgumentException("Proxy must not be null"); + } + if (rpcType == null) { + throw new IllegalArgumentException( + "rpcType (QName) should not be null"); + } + Future> ret = null; + + // Real invocation starts here + RpcMapper mapper = mapping + .rpcMapperForProxy(rpcProxy); + RpcService impl = rpcImpls.get(mapper.getServiceClass()); + + if (impl == null) { + // RPC is probably remote + CompositeNode inputNode = null; + Mapper inputMapper = mapper.getInputMapper(); + if (inputMapper != null) { + inputNode = inputMapper.domFromObject(inputData); + } + Future> biResult = biSession.rpc(rpcType, + inputNode); + ret = new TranslatedFuture(biResult, mapper); + + } else { + // RPC is local + Callable> invocation = localRpcCallableFor( + impl, mapper, rpcType, inputData); + ret = executor.submit(invocation); + } + return ret; + } + + private Callable> localRpcCallableFor( + final RpcService impl, + final RpcMapper mapper, final QName rpcType, + final DataObject inputData) { + + return new Callable>() { + + @Override + public RpcResult call() throws Exception { + return mapper.invokeRpcImplementation(rpcType, impl, inputData); + } + }; + } + + // Binding Independent invocation of Binding Aware RPC + private RpcResult invokeLocalRpc(QName rpc, + CompositeNode inputNode) { + RpcMapper mapper = mapping.rpcMapperForData(rpc, + inputNode); + + DataObject inputTO = mapper.getInputMapper().objectFromDom(inputNode); + + RpcService impl = rpcImpls.get(mapper.getServiceClass()); + if (impl == null) { + log.warn("Implementation for rpc: " + rpc + "not available."); + } + RpcResult result = mapper + .invokeRpcImplementation(rpc, impl, inputTO); + DataObject outputTO = result.getResult(); + + CompositeNode outputNode = null; + if (outputTO != null) { + outputNode = mapper.getOutputMapper().domFromObject(outputTO); + } + return Rpcs.getRpcResult(result.isSuccessful(), outputNode, + result.getErrors()); + } + private class ConsumerSessionImpl implements BindingAwareBroker.ConsumerSession { private final BindingAwareConsumer consumer; - private Map, BindingAwareService> sessionSalServices = new HashMap, BindingAwareService>(); + private Map, BindingAwareService> sessionSalServices = Collections + .synchronizedMap(new HashMap, BindingAwareService>()); + + private Map, RpcService> sessionRpcProxies = Collections + .synchronizedMap(new HashMap, RpcService>()); public ConsumerSessionImpl(BindingAwareConsumer cons) { this.consumer = cons; @@ -153,9 +276,27 @@ public class BindingBrokerImpl implements BindingAwareBroker { } @Override - public T getRpcService(Class module) { - // TODO Implement this method - throw new UnsupportedOperationException("Not implemented"); + public T getRpcService(Class service) { + RpcService current = sessionRpcProxies.get(service); + if (current != null) { + if (service.isInstance(current)) { + @SuppressWarnings("unchecked") + T ret = (T) current; + return ret; + } else { + log.error("Proxy for rpc service " + service.getName() + + " does not implement the service interface"); + throw new IllegalStateException("Service implementation " + + current.getClass().getName() + + "does not implement " + service.getName()); + } + } else { + T ret = BindingBrokerImpl.this.newRpcProxyForSession(service); + if (ret != null) { + sessionRpcProxies.put(service, ret); + } + return ret; + } } public BindingAwareConsumer getConsumer() { @@ -176,12 +317,20 @@ public class BindingBrokerImpl implements BindingAwareBroker { @Override public void addRpcImplementation(RpcService implementation) { + if (implementation == null) { + throw new IllegalArgumentException( + "Implementation should not be null"); + } // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @Override public void removeRpcImplementation(RpcService implementation) { + if (implementation == null) { + throw new IllegalArgumentException( + "Implementation should not be null"); + } // TODO Implement this method throw new UnsupportedOperationException("Not implemented"); } @@ -192,4 +341,96 @@ public class BindingBrokerImpl implements BindingAwareBroker { } + private class BIFacade implements Provider,RpcImplementation { + + @Override + public Set getSupportedRpcs() { + return Collections.emptySet(); + } + + @Override + public RpcResult invokeRpc(QName rpc, CompositeNode input) { + if (rpc == null) { + throw new IllegalArgumentException( + "Rpc type should not be null"); + } + + return BindingBrokerImpl.this.invokeLocalRpc(rpc, input); + } + + @Override + public void onSessionInitiated( + org.opendaylight.controller.sal.core.api.Broker.ProviderSession session) { + + BindingBrokerImpl.this.biSession = session; + for (SALBindingModule module : modules) { + try { + module.onBISessionAvailable(biSession); + } catch(Exception e) { + log.error("Module " +module +" throwed unexpected exception",e); + } + } + } + + @Override + public Collection getProviderFunctionality() { + return Collections.emptySet(); + } + + } + + private static class TranslatedFuture implements + Future> { + private final Future> realFuture; + private final RpcMapper mapper; + + public TranslatedFuture(Future> future, + RpcMapper mapper) { + realFuture = future; + this.mapper = mapper; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return realFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return realFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return realFuture.isDone(); + } + + @Override + public RpcResult get() + throws InterruptedException, ExecutionException { + RpcResult val = realFuture.get(); + return tranlate(val); + } + + @Override + public RpcResult get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, + TimeoutException { + RpcResult val = realFuture.get(timeout, unit); + return tranlate(val); + } + + private RpcResult tranlate( + RpcResult result) { + CompositeNode outputNode = result.getResult(); + DataObject outputTO = null; + if (outputNode != null) { + Mapper outputMapper = mapper.getOutputMapper(); + outputTO = outputMapper.objectFromDom(outputNode); + } + return Rpcs.getRpcResult(result.isSuccessful(), outputTO, + result.getErrors()); + } + + } }