X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2Fconnect%2Fdom%2FBindingIndependentConnector.java;h=5630664a678e8387426bb6a2b2ff88957b0654e8;hp=7ec1f6512d27a165d878973c79668d679d7f3ee9;hb=10a8a3e140716052475cd641629f302001d18936;hpb=f2f7098e3678756d90a348cefc2b110757ce126a diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java index 7ec1f6512d..5630664a67 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java @@ -17,6 +17,7 @@ import java.lang.reflect.Proxy; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -37,12 +38,17 @@ import org.opendaylight.controller.md.sal.common.api.data.DataReader; import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangePublisher; +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider; import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier; import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter; +import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; +import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException; import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl; +import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener; import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener; import org.opendaylight.controller.sal.common.util.CommitHandlerTransactions; import org.opendaylight.controller.sal.common.util.Rpcs; @@ -52,6 +58,8 @@ import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction; +import org.opendaylight.controller.sal.core.api.notify.NotificationListener; +import org.opendaylight.controller.sal.core.api.notify.NotificationPublishService; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.concepts.util.ClassLoaderUtils; @@ -62,6 +70,7 @@ import org.opendaylight.yangtools.yang.binding.BindingMapping; import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.binding.RpcService; import org.opendaylight.yangtools.yang.binding.util.BindingReflections; import org.opendaylight.yangtools.yang.common.QName; @@ -86,17 +95,18 @@ public class BindingIndependentConnector implements // Provider, // AutoCloseable { + + private final Logger LOG = LoggerFactory.getLogger(BindingIndependentConnector.class); - @SuppressWarnings( "deprecation") + @SuppressWarnings("deprecation") private static final InstanceIdentifier ROOT = InstanceIdentifier.builder().toInstance(); private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier ROOT_BI = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier .builder().toInstance(); private final static Method EQUALS_METHOD; - - + private BindingIndependentMappingService mappingService; private org.opendaylight.controller.sal.core.api.data.DataProviderService biDataService; @@ -140,11 +150,14 @@ public class BindingIndependentConnector implements // private RpcProviderRegistryImpl baRpcRegistryImpl; private org.opendaylight.controller.sal.dom.broker.spi.RpcRouter biRouter; - - + + private NotificationProviderService baNotifyService; + + private NotificationPublishService domNotificationService; + static { try { - EQUALS_METHOD = Object.class.getMethod("equals", Object.class); + EQUALS_METHOD = Object.class.getMethod("equals", Object.class); } catch (Exception e) { throw new RuntimeException(e); } @@ -304,8 +317,9 @@ public class BindingIndependentConnector implements // if (baRpcRegistry instanceof RpcProviderRegistryImpl) { baRpcRegistryImpl = (RpcProviderRegistryImpl) baRpcRegistry; baRpcRegistryImpl.registerRouterInstantiationListener(domToBindingRpcManager.getInstance()); + baRpcRegistryImpl.registerGlobalRpcRegistrationListener(domToBindingRpcManager.getInstance()); } - if(biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) { + if (biRpcRegistry instanceof org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) { biRouter = (org.opendaylight.controller.sal.dom.broker.spi.RpcRouter) biRpcRegistry; } rpcForwarding = true; @@ -314,7 +328,11 @@ public class BindingIndependentConnector implements // public void startNotificationForwarding() { checkState(!notificationForwarding, "Connector is already forwarding notifications."); - notificationForwarding = true; + if (baNotifyService != null && domNotificationService != null) { + baNotifyService.registerInterestListener(new DomToBindingNotificationForwarder()); + + notificationForwarding = true; + } } protected void setMappingService(BindingIndependentMappingService mappingService) { @@ -449,18 +467,18 @@ public class BindingIndependentConnector implements // } DataModificationTransaction domTransaction = createBindingToDomTransaction(bindingTransaction); BindingToDomTransaction wrapped = new BindingToDomTransaction(domTransaction, bindingTransaction); - LOG.info("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(), + LOG.trace("Forwarding Binding Transaction: {} as DOM Transaction: {} .", bindingTransaction.getIdentifier(), domTransaction.getIdentifier()); return wrapped; } } private class DomToBindingCommitHandler implements // - RegistrationListener, DataObject>>, // + RegistrationListener, DataObject>>, // DataCommitHandler { @Override - public void onRegister(DataCommitHandlerRegistration, DataObject> registration) { + public void onRegister(DataCommitHandlerRegistration, DataObject> registration) { org.opendaylight.yangtools.yang.data.api.InstanceIdentifier domPath = mappingService.toDataDom(registration .getPath()); @@ -468,7 +486,7 @@ public class BindingIndependentConnector implements // } @Override - public void onUnregister(DataCommitHandlerRegistration, DataObject> registration) { + public void onUnregister(DataCommitHandlerRegistration, DataObject> registration) { // NOOP for now // FIXME: do registration based on only active commit handlers. } @@ -489,7 +507,7 @@ public class BindingIndependentConnector implements // org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction baTransaction = createDomToBindingTransaction(domTransaction); DomToBindingTransaction forwardedTransaction = new DomToBindingTransaction(baTransaction, domTransaction); - LOG.info("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(), + LOG.trace("Forwarding DOM Transaction: {} as Binding Transaction: {}.", domTransaction.getIdentifier(), baTransaction.getIdentifier()); return forwardedTransaction; } @@ -498,11 +516,12 @@ public class BindingIndependentConnector implements // /** * Manager responsible for instantiating forwarders responsible for * forwarding of RPC invocations from DOM Broker to Binding Aware Broker - * + * */ private class DomToBindingRpcForwardingManager implements RouteChangeListener>, - RouterInstantiationListener { + RouterInstantiationListener, + GlobalRpcRegistrationListener { private final Map, DomToBindingRpcForwarder> forwarders = new WeakHashMap<>(); private RpcProviderRegistryImpl registryImpl; @@ -514,8 +533,17 @@ public class BindingIndependentConnector implements // public void setRegistryImpl(RpcProviderRegistryImpl registryImpl) { this.registryImpl = registryImpl; } - - + + @Override + public void onGlobalRpcRegistered(Class cls) { + getRpcForwarder(cls, null); + } + + @Override + public void onGlobalRpcUnregistered(Class cls) { + // NOOP + } + @Override public void onRpcRouterCreated(RpcRouter router) { Class ctx = router.getContexts().iterator().next(); @@ -575,14 +603,14 @@ public class BindingIndependentConnector implements // } } catch (Exception e) { - LOG.error("Could not forward Rpcs of type {}", service.getName()); + LOG.error("Could not forward Rpcs of type {}", service.getName(),e); } registrations = ImmutableSet.of(); } /** * Constructor for Routed RPC Forwareder. - * + * * @param service * @param context */ @@ -600,7 +628,7 @@ public class BindingIndependentConnector implements // } createDefaultDomForwarder(); } catch (Exception e) { - LOG.error("Could not forward Rpcs of type {}", service.getName(),e); + LOG.error("Could not forward Rpcs of type {}", service.getName(), e); } registrations = registrationsBuilder.build(); } @@ -616,16 +644,16 @@ public class BindingIndependentConnector implements // } } - + @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - if(EQUALS_METHOD.equals(method)) { + if (EQUALS_METHOD.equals(method)) { return false; } RpcInvocationStrategy strategy = strategiesByMethod.get(method); checkState(strategy != null); checkArgument(args.length <= 2); - if(args.length == 1) { + if (args.length == 1) { checkArgument(args[0] instanceof DataObject); return strategy.forwardToDomBroker((DataObject) args[0]); } @@ -654,7 +682,7 @@ public class BindingIndependentConnector implements // Class cls = rpcServiceType.get(); ClassLoader clsLoader = cls.getClassLoader(); RpcService proxy = (RpcService) Proxy.newProxyInstance(clsLoader, new Class[] { cls }, this); - + RpcRouter rpcRouter = baRpcRegistryImpl.getRpcRouter(rpcServiceType.get()); rpcRouter.registerDefaultService(proxy); } @@ -703,12 +731,15 @@ public class BindingIndependentConnector implements // RpcInvocationStrategy strategy = null; if (outputClass.isPresent()) { if (inputClass.isPresent()) { - strategy = new DefaultInvocationStrategy(rpc,targetMethod, outputClass.get(), inputClass.get()); + strategy = new DefaultInvocationStrategy(rpc, targetMethod, outputClass.get(), inputClass + .get()); } else { - strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod); + strategy = new NoInputNoOutputInvocationStrategy(rpc, targetMethod); } + } else if(inputClass.isPresent()){ + strategy = new NoOutputInvocationStrategy(rpc,targetMethod, inputClass.get()); } else { - strategy = null; + strategy = new NoInputNoOutputInvocationStrategy(rpc,targetMethod); } return strategy; } @@ -722,7 +753,7 @@ public class BindingIndependentConnector implements // protected final Method targetMethod; protected final QName rpc; - public RpcInvocationStrategy(QName rpc,Method targetMethod) { + public RpcInvocationStrategy(QName rpc, Method targetMethod) { this.targetMethod = targetMethod; this.rpc = rpc; } @@ -748,36 +779,42 @@ public class BindingIndependentConnector implements // @SuppressWarnings({ "rawtypes", "unchecked" }) public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class outputClass, Class inputClass) { - super(rpc,targetMethod); + super(rpc, targetMethod); this.outputClass = new WeakReference(outputClass); this.inputClass = new WeakReference(inputClass); } + @SuppressWarnings("unchecked") @Override public RpcResult uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception { DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput); - Future> result = (Future>) targetMethod.invoke(rpcService, bindingInput); - if (result == null) { + Future> futureResult = (Future>) targetMethod.invoke(rpcService, bindingInput); + if (futureResult == null) { return Rpcs.getRpcResult(false); } - RpcResult bindingResult = result.get(); + RpcResult bindingResult = futureResult.get(); + final Object resultObj = bindingResult.getResult(); + if (resultObj instanceof DataObject) { + final CompositeNode output = mappingService.toDataDom((DataObject)resultObj); + return Rpcs.getRpcResult(true, output, Collections.emptySet()); + } return Rpcs.getRpcResult(true); } - + @Override public Future> forwardToDomBroker(DataObject input) { - if(biRouter != null) { + if(biRouter != null) { CompositeNode xml = mappingService.toDataDom(input); - CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.>of(xml)); + CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc, ImmutableList.> of(xml)); RpcResult result = biRouter.invokeRpc(rpc, wrappedXml); Object baResultValue = null; - if(result.getResult() != null) { + if (result.getResult() != null) { baResultValue = mappingService.dataObjectFromDataDom(outputClass.get(), result.getResult()); } RpcResult baResult = Rpcs.getRpcResult(result.isSuccessful(), baResultValue, result.getErrors()); - return Futures.>immediateFuture(baResult); + return Futures.> immediateFuture(baResult); } - return Futures.>immediateFuture(Rpcs.getRpcResult(false)); + return Futures.> immediateFuture(Rpcs.getRpcResult(false)); } } @@ -785,7 +822,7 @@ public class BindingIndependentConnector implements // private class NoInputNoOutputInvocationStrategy extends RpcInvocationStrategy { public NoInputNoOutputInvocationStrategy(QName rpc, Method targetMethod) { - super(rpc,targetMethod); + super(rpc, targetMethod); } public RpcResult uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception { @@ -794,12 +831,52 @@ public class BindingIndependentConnector implements // RpcResult bindingResult = result.get(); return Rpcs.getRpcResult(bindingResult.isSuccessful(), bindingResult.getErrors()); } - + @Override public Future> forwardToDomBroker(DataObject input) { return Futures.immediateFuture(null); } } + + private class NoOutputInvocationStrategy extends RpcInvocationStrategy { + + + @SuppressWarnings("rawtypes") + private WeakReference inputClass; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public NoOutputInvocationStrategy(QName rpc, Method targetMethod, + Class inputClass) { + super(rpc,targetMethod); + this.inputClass = new WeakReference(inputClass); + } + + + @Override + public RpcResult uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception { + DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput); + Future> result = (Future>) targetMethod.invoke(rpcService, bindingInput); + if (result == null) { + return Rpcs.getRpcResult(false); + } + RpcResult bindingResult = result.get(); + return Rpcs.getRpcResult(true); + } + + @Override + public Future> forwardToDomBroker(DataObject input) { + if(biRouter != null) { + CompositeNode xml = mappingService.toDataDom(input); + CompositeNode wrappedXml = ImmutableCompositeNode.create(rpc,ImmutableList.>of(xml)); + RpcResult result = biRouter.invokeRpc(rpc, wrappedXml); + Object baResultValue = null; + RpcResult baResult = Rpcs.getRpcResult(result.isSuccessful(), null, result.getErrors()); + return Futures.>immediateFuture(baResult); + } + return Futures.>immediateFuture(Rpcs.getRpcResult(false)); + } + + } public boolean isRpcForwarding() { return rpcForwarding; @@ -810,11 +887,60 @@ public class BindingIndependentConnector implements // } public boolean isNotificationForwarding() { - // TODO Auto-generated method stub return notificationForwarding; } public BindingIndependentMappingService getMappingService() { return mappingService; } + + public void setBindingNotificationService(NotificationProviderService baService) { + this.baNotifyService = baService; + + } + + public void setDomNotificationService(NotificationPublishService domService) { + this.domNotificationService = domService; + } + + private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener { + + private ConcurrentMap>> notifications = new ConcurrentHashMap<>(); + private Set supportedNotifications = new HashSet<>(); + + @Override + public Set getSupportedNotifications() { + return Collections.unmodifiableSet(supportedNotifications); + } + + @Override + public void onNotification(CompositeNode notification) { + QName qname = notification.getNodeType(); + WeakReference> potential = notifications.get(qname); + if (potential != null) { + Class potentialClass = potential.get(); + if (potentialClass != null) { + final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass, + notification); + + if (baNotification instanceof Notification) { + baNotifyService.publish((Notification) baNotification); + } + } + } + } + + @Override + public void onNotificationSubscribtion(Class notificationType) { + QName qname = BindingReflections.findQName(notificationType); + if (qname != null) { + WeakReference> already = notifications.putIfAbsent(qname, + new WeakReference>(notificationType)); + if (already == null) { + domNotificationService.addNotificationListener(qname, this); + supportedNotifications.add(qname); + } + } + } + } }