X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2Fconnect%2Fdom%2FBindingIndependentConnector.java;h=fe8c4a151c1691495ea4e5f00bf443b77878d930;hp=2b40437d6446a9cce652b8245c82c3dc2342e8f9;hb=b15ad095a13977fad81fabd266cb5d832164d3a7;hpb=7a3ffec06a6126cfa4c66835b4c14f135ff432b4 diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java index 2b40437d64..fe8c4a151c 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java @@ -57,10 +57,14 @@ import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration; import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; +import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction; import org.opendaylight.controller.sal.core.api.notify.NotificationListener; import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService; +import org.opendaylight.yangtools.concepts.CompositeObjectRegistration; +import org.opendaylight.yangtools.concepts.CompositeObjectRegistration.CompositeObjectRegistrationBuilder; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.concepts.ObjectRegistration; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.Augmentable; import org.opendaylight.yangtools.yang.binding.Augmentation; @@ -77,8 +81,6 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.opendaylight.yangtools.yang.data.api.Node; -import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException; import org.slf4j.Logger; @@ -87,7 +89,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.util.concurrent.Futures; @@ -325,7 +326,10 @@ public class BindingIndependentConnector implements // public void startRpcForwarding() { if (biRpcRegistry != null && baRpcRegistry instanceof RouteChangePublisher) { checkState(!rpcForwarding, "Connector is already forwarding RPCs"); - domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(new DomToBindingRpcForwardingManager()); + final DomToBindingRpcForwardingManager biFwdManager = new DomToBindingRpcForwardingManager(); + + domToBindingRpcManager = baRpcRegistry.registerRouteChangeListener(biFwdManager); + biRpcRegistry.addRpcRegistrationListener(biFwdManager); if (baRpcRegistry instanceof RpcProviderRegistryImpl) { baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry; baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance()); @@ -490,7 +494,7 @@ public class BindingIndependentConnector implements // public void onRegister( final DataCommitHandlerRegistration, DataObject> registration) { - org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration + mappingService.toDataDom(registration .getPath()); } @@ -532,7 +536,7 @@ public class BindingIndependentConnector implements // */ private class DomToBindingRpcForwardingManager implements RouteChangeListener>, RouterInstantiationListener, - GlobalRpcRegistrationListener { + GlobalRpcRegistrationListener, RpcRegistrationListener { private final Map, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>(); private RpcProviderRegistryImpl registryImpl; @@ -547,7 +551,7 @@ public class BindingIndependentConnector implements // @Override public void onGlobalRpcRegistered(final Class cls) { - getRpcForwarder(cls, null); + getRpcForwarder(cls, null).registerToDOMBroker(); } @Override @@ -592,31 +596,40 @@ public class BindingIndependentConnector implements // return potential; } + @Override + public void onRpcImplementationAdded(final QName name) { + + final Optional> rpcInterface = mappingService.getRpcServiceClassFor( + name.getNamespace().toString(), name.getFormattedRevision()); + if (rpcInterface.isPresent()) { + getRpcForwarder(rpcInterface.get(), null).registerToBidningBroker(); + } + } + + @Override + public void onRpcImplementationRemoved(final QName name) { + + } } private class DomToBindingRpcForwarder implements RpcImplementation, InvocationHandler { private final Set supportedRpcs; private final WeakReference> rpcServiceType; - private final Set registrations; + private Set registrations; private final Map strategiesByQName = new HashMap<>(); private final WeakHashMap strategiesByMethod = new WeakHashMap<>(); + private final RpcService proxy; + private ObjectRegistration forwarderRegistration; public DomToBindingRpcForwarder(final Class service) { this.rpcServiceType = new WeakReference>(service); this.supportedRpcs = mappingService.getRpcQNamesFor(service); - try { - for (QName rpc : supportedRpcs) { - RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service); - strategiesByMethod.put(strategy.targetMethod, strategy); - strategiesByQName.put(rpc, strategy); - biRpcRegistry.addRpcImplementation(rpc, this); - } - } catch (Exception e) { - LOG.error("Could not forward Rpcs of type {}", service.getName(), e); - } - registrations = ImmutableSet.of(); + Class cls = rpcServiceType.get(); + ClassLoader clsLoader = cls.getClassLoader(); + proxy =(RpcService) Proxy.newProxyInstance(clsLoader, new Class[] { cls }, this); + createStrategies(); } /** @@ -626,16 +639,12 @@ public class BindingIndependentConnector implements // * @param context */ public DomToBindingRpcForwarder(final Class service, - final Class context) { - this.rpcServiceType = new WeakReference>(service); - this.supportedRpcs = mappingService.getRpcQNamesFor(service); + final Class context) { + this(service); Builder registrationsBuilder = ImmutableSet . builder(); try { for (QName rpc : supportedRpcs) { - RpcInvocationStrategy strategy = createInvocationStrategy(rpc, service); - strategiesByMethod.put(strategy.targetMethod, strategy); - strategiesByQName.put(rpc, strategy); registrationsBuilder.add(biRpcRegistry.addRoutedRpcImplementation(rpc, this)); } createDefaultDomForwarder(); @@ -645,6 +654,46 @@ public class BindingIndependentConnector implements // registrations = registrationsBuilder.build(); } + + + private void createStrategies() { + try { + for (QName rpc : supportedRpcs) { + RpcInvocationStrategy strategy = createInvocationStrategy(rpc, rpcServiceType.get()); + strategiesByMethod.put(strategy.targetMethod, strategy); + strategiesByQName.put(rpc, strategy); + } + } catch (Exception e) { + LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e); + } + + } + + /** + * Registers RPC Forwarder to DOM Broker, + * this means Binding Aware Broker has implementation of RPC + * which is registered to it. + * + * If RPC Forwarder was previously registered to DOM Broker + * or to Bidning Broker this method is noop to prevent + * creating forwarding loop. + * + */ + public void registerToDOMBroker() { + if(forwarderRegistration == null) { + CompositeObjectRegistrationBuilder builder = CompositeObjectRegistration.builderFor(this); + try { + for (QName rpc : supportedRpcs) { + builder.add(biRpcRegistry.addRpcImplementation(rpc, this)); + } + } catch (Exception e) { + LOG.error("Could not forward Rpcs of type {}", rpcServiceType.get(), e); + } + this.forwarderRegistration = builder.toInstance(); + } + } + + public void registerPaths(final Class context, final Class service, final Set> set) { QName ctx = BindingReflections.findQName(context); @@ -736,221 +785,30 @@ public class BindingIndependentConnector implements // } } checkState(targetMethod != null, "Rpc method not found"); - Optional> outputClass = BindingReflections.resolveRpcOutputClass(targetMethod); - Optional> inputClass = BindingReflections - .resolveRpcInputClass(targetMethod); - - RpcInvocationStrategy strategy = null; - if (outputClass.isPresent()) { - if (inputClass.isPresent()) { - strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass - .get()); - } else { - strategy = new NoInputInvocationStrategy(rpc, targetMethod, outputClass.get()); - } - } else if (inputClass.isPresent()) { - strategy = new NoOutputInvocationStrategy(rpc, targetMethod, inputClass.get()); - } else { - strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod); - } - return strategy; + return new RpcInvocationStrategy(rpc,targetMethod, mappingService, biRpcRegistry); } }); } - } - - private abstract class RpcInvocationStrategy { - - protected final Method targetMethod; - protected final QName rpc; - - public RpcInvocationStrategy(final QName rpc, final Method targetMethod) { - this.targetMethod = targetMethod; - this.rpc = rpc; - } - - public abstract Future> forwardToDomBroker(DataObject input); - - public abstract RpcResult uncheckedInvoke(RpcService rpcService, CompositeNode domInput) - throws Exception; - - public RpcResult invokeOn(final RpcService rpcService, final CompositeNode domInput) - throws Exception { - return uncheckedInvoke(rpcService, domInput); - } - } - - private class DefaultInvocationStrategy extends RpcInvocationStrategy { - - @SuppressWarnings("rawtypes") - private final WeakReference inputClass; - - @SuppressWarnings("rawtypes") - private final WeakReference outputClass; - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public DefaultInvocationStrategy(final QName rpc, final Method targetMethod, final Class outputClass, - final Class inputClass) { - super(rpc, targetMethod); - this.outputClass = new WeakReference(outputClass); - this.inputClass = new WeakReference(inputClass); - } - @SuppressWarnings("unchecked") - @Override - public RpcResult uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) - throws Exception { - DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput); - Future> futureResult = (Future>) targetMethod.invoke(rpcService, bindingInput); - if (futureResult == null) { - return Rpcs.getRpcResult(false); - } - RpcResult bindingResult = futureResult.get(); - final Object resultObj = bindingResult.getResult(); - if (resultObj instanceof DataObject) { - final CompositeNode output = mappingService.toDataDom((DataObject) resultObj); - return Rpcs.getRpcResult(true, output, Collections. emptySet()); - } - return Rpcs.getRpcResult(true); - } - - @Override - public ListenableFuture> forwardToDomBroker(final DataObject input) { - if (biRpcRegistry == null) { - return Futures.> immediateFuture(Rpcs.getRpcResult(false)); - } - - CompositeNode xml = mappingService.toDataDom(input); - CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.> of(xml)); - - return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), - new Function, RpcResult>() { - @Override - public RpcResult apply(final RpcResult input) { - Object baResultValue = null; - if (input.getResult() != null) { - baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), - input.getResult()); - } - return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors()); - } - }); - } - } - - private class NoInputInvocationStrategy extends RpcInvocationStrategy { - - @SuppressWarnings("rawtypes") - private final WeakReference outputClass; - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public NoInputInvocationStrategy(final QName rpc, final Method targetMethod, final Class outputClass) { - super(rpc, targetMethod); - this.outputClass = new WeakReference(outputClass); - } - - @SuppressWarnings("unchecked") - @Override - public RpcResult uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) - throws Exception { - Future> futureResult = (Future>) targetMethod.invoke(rpcService); - if (futureResult == null) { - return Rpcs.getRpcResult(false); - } - RpcResult bindingResult = futureResult.get(); - final Object resultObj = bindingResult.getResult(); - if (resultObj instanceof DataObject) { - final CompositeNode output = mappingService.toDataDom((DataObject) resultObj); - return Rpcs.getRpcResult(true, output, Collections. emptySet()); - } - return Rpcs.getRpcResult(true); - } - - @Override - public Future> forwardToDomBroker(final DataObject input) { - if (biRpcRegistry != null) { - CompositeNode xml = mappingService.toDataDom(input); - CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.> of(xml)); - return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), - new Function, RpcResult>() { - @Override - public RpcResult apply(final RpcResult input) { - Object baResultValue = null; - if (input.getResult() != null) { - baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), - input.getResult()); - } - return Rpcs.getRpcResult(input.isSuccessful(), baResultValue, input.getErrors()); - } - }); - } else { - return Futures.> immediateFuture(Rpcs.getRpcResult(false)); - } - } - } - - private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy { - - public NoInputNoOutputInvocationStrategy(final QName rpc, final Method targetMethod) { - super(rpc, targetMethod); - } - - @Override - public RpcResult uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) - throws Exception { - @SuppressWarnings("unchecked") - Future> result = (Future>) targetMethod.invoke(rpcService); - RpcResult bindingResult = result.get(); - return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors()); - } - - @Override - public Future> forwardToDomBroker(final DataObject input) { - return Futures.immediateFuture(null); - } - } - - private class NoOutputInvocationStrategy extends RpcInvocationStrategy { - - @SuppressWarnings("rawtypes") - private final WeakReference inputClass; - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public NoOutputInvocationStrategy(final QName rpc, final Method targetMethod, - final Class inputClass) { - super(rpc, targetMethod); - this.inputClass = new WeakReference(inputClass); - } - - @Override - public RpcResult uncheckedInvoke(final RpcService rpcService, final CompositeNode domInput) - throws Exception { - DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput); - Future> result = (Future>) targetMethod.invoke(rpcService, bindingInput); - if (result == null) { - return Rpcs.getRpcResult(false); - } - RpcResult bindingResult = result.get(); - return Rpcs.getRpcResult(true); - } - - @Override - public ListenableFuture> forwardToDomBroker(final DataObject input) { - if (biRpcRegistry == null) { - return Futures.> immediateFuture(Rpcs.getRpcResult(false)); + /** + * Registers RPC Forwarder to Binding Broker, + * this means DOM Broekr has implementation of RPC + * which is registered to it. + * + * If RPC Forwarder was previously registered to DOM Broker + * or to Bidning Broker this method is noop to prevent + * creating forwarding loop. + * + */ + public void registerToBidningBroker() { + if(forwarderRegistration == null) { + try { + this.forwarderRegistration = baRpcRegistry.addRpcImplementation((Class)rpcServiceType.get(), proxy); + } catch (Exception e) { + LOG.error("Unable to forward RPCs for {}",rpcServiceType.get(),e); + } } - - CompositeNode xml = mappingService.toDataDom(input); - CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.> of(xml)); - - return Futures.transform(biRpcRegistry.invokeRpc(rpc, wrappedXml), - new Function, RpcResult>() { - @Override - public RpcResult apply(final RpcResult input) { - return Rpcs. getRpcResult(input.isSuccessful(), null, input.getErrors()); - } - }); } }